You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/06/30 10:48:22 UTC
[airflow] 02/05: Reload gunicorn when plugins has beeen changed
(#8997)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 36f79a9b227992f2efd6dca0ae7d4ad15dc8d2e6
Author: Kamil BreguĊa <ka...@polidea.com>
AuthorDate: Mon Jun 29 20:38:26 2020 +0200
Reload gunicorn when plugins has beeen changed (#8997)
(cherry picked from commit 1c48ffbe25c3e304660b7e75a49e88bd114dde46)
---
airflow/bin/cli.py | 350 +++++++++++++++++++--------
airflow/config_templates/config.yml | 8 +
airflow/config_templates/default_airflow.cfg | 4 +
tests/cli/test_cli.py | 233 +++++++++++++++---
4 files changed, 452 insertions(+), 143 deletions(-)
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 8b5522c..b81c7b1 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -20,6 +20,7 @@
from __future__ import print_function
import errno
+import hashlib
import importlib
import locale
import logging
@@ -777,31 +778,11 @@ def clear(args):
)
-def get_num_ready_workers_running(gunicorn_master_proc):
- workers = psutil.Process(gunicorn_master_proc.pid).children()
-
- def ready_prefix_on_cmdline(proc):
- try:
- cmdline = proc.cmdline()
- if len(cmdline) > 0:
- return settings.GUNICORN_WORKER_READY_PREFIX in cmdline[0]
- except psutil.NoSuchProcess:
- pass
- return False
-
- ready_workers = [proc for proc in workers if ready_prefix_on_cmdline(proc)]
- return len(ready_workers)
-
-
-def get_num_workers_running(gunicorn_master_proc):
- workers = psutil.Process(gunicorn_master_proc.pid).children()
- return len(workers)
-
-
-def restart_workers(gunicorn_master_proc, num_workers_expected, master_timeout):
+class GunicornMonitor(LoggingMixin):
"""
Runs forever, monitoring the child processes of @gunicorn_master_proc and
- restarting workers occasionally.
+ restarting workers occasionally or when files in the plug-in directory
+ has been modified.
Each iteration of the loop traverses one edge of this state transition
diagram, where each state (node) represents
[ num_ready_workers_running / num_workers_running ]. We expect most time to
@@ -818,92 +799,245 @@ def restart_workers(gunicorn_master_proc, num_workers_expected, master_timeout):
master process, which increases and decreases the number of child workers
respectively. Gunicorn guarantees that on TTOU workers are terminated
gracefully and that the oldest worker is terminated.
+
+ :param gunicorn_master_proc: handle for the main Gunicorn process
+ :param num_workers_expected: Number of workers to run the Gunicorn web server
+ :param master_timeout: Number of seconds the webserver waits before killing gunicorn master that
+ doesn't respond
+ :param worker_refresh_interval: Number of seconds to wait before refreshing a batch of workers.
+ :param worker_refresh_batch_size: Number of workers to refresh at a time. When set to 0, worker
+ refresh is disabled. When nonzero, airflow periodically refreshes webserver workers by
+ bringing up new ones and killing old ones.
+ :param reload_on_plugin_change: If set to True, Airflow will track files in plugins_follder directory.
+ When it detects changes, then reload the gunicorn.
"""
+ def __init__(
+ self,
+ gunicorn_master_proc,
+ num_workers_expected,
+ master_timeout,
+ worker_refresh_interval,
+ worker_refresh_batch_size,
+ reload_on_plugin_change
+ ):
+ super(GunicornMonitor, self).__init__()
+ self.gunicorn_master_proc = gunicorn_master_proc
+ self.num_workers_expected = num_workers_expected
+ self.master_timeout = master_timeout
+ self.worker_refresh_interval = worker_refresh_interval
+ self.worker_refresh_batch_size = worker_refresh_batch_size
+ self.reload_on_plugin_change = reload_on_plugin_change
+
+ self._num_workers_running = 0
+ self._num_ready_workers_running = 0
+ self._last_refresh_time = time.time() if worker_refresh_interval > 0 else None
+ self._last_plugin_state = self._generate_plugin_state() if reload_on_plugin_change else None
+ self._restart_on_next_plugin_check = False
+
+ def _generate_plugin_state(self):
+ """
+ Generate dict of filenames and last modification time of all files in settings.PLUGINS_FOLDER
+ directory.
+ """
+ if not settings.PLUGINS_FOLDER:
+ return {}
+
+ all_filenames = []
+ for (root, _, filenames) in os.walk(settings.PLUGINS_FOLDER):
+ all_filenames.extend(os.path.join(root, f) for f in filenames)
+ plugin_state = {f: self._get_file_hash(f) for f in sorted(all_filenames)}
+ return plugin_state
+
+ @staticmethod
+ def _get_file_hash(fname):
+ """Calculate MD5 hash for file"""
+ hash_md5 = hashlib.md5()
+ with open(fname, "rb") as f:
+ for chunk in iter(lambda: f.read(4096), b""):
+ hash_md5.update(chunk)
+ return hash_md5.hexdigest()
+
+ def _get_num_ready_workers_running(self):
+ """Returns number of ready Gunicorn workers by looking for READY_PREFIX in process name"""
+ workers = psutil.Process(self.gunicorn_master_proc.pid).children()
+
+ def ready_prefix_on_cmdline(proc):
+ try:
+ cmdline = proc.cmdline()
+ if len(cmdline) > 0: # pylint: disable=len-as-condition
+ return settings.GUNICORN_WORKER_READY_PREFIX in cmdline[0]
+ except psutil.NoSuchProcess:
+ pass
+ return False
+
+ ready_workers = [proc for proc in workers if ready_prefix_on_cmdline(proc)]
+ return len(ready_workers)
- def wait_until_true(fn, timeout=0):
+ def _get_num_workers_running(self):
+ """Returns number of running Gunicorn workers processes"""
+ workers = psutil.Process(self.gunicorn_master_proc.pid).children()
+ return len(workers)
+
+ def _wait_until_true(self, fn, timeout=0):
"""
Sleeps until fn is true
"""
- t = time.time()
+ start_time = time.time()
while not fn():
- if 0 < timeout and timeout <= time.time() - t:
+ if 0 < timeout <= time.time() - start_time:
raise AirflowWebServerTimeout(
- "No response from gunicorn master within {0} seconds"
- .format(timeout))
+ "No response from gunicorn master within {0} seconds".format(timeout)
+ )
time.sleep(0.1)
- def start_refresh(gunicorn_master_proc):
- batch_size = conf.getint('webserver', 'worker_refresh_batch_size')
- log.debug('%s doing a refresh of %s workers', state, batch_size)
- sys.stdout.flush()
- sys.stderr.flush()
-
+ def _spawn_new_workers(self, count):
+ """
+ Send signal to kill the worker.
+ :param count: The number of workers to spawn
+ """
excess = 0
- for _ in range(batch_size):
- gunicorn_master_proc.send_signal(signal.SIGTTIN)
+ for _ in range(count):
+ # TTIN: Increment the number of processes by one
+ self.gunicorn_master_proc.send_signal(signal.SIGTTIN)
excess += 1
- wait_until_true(lambda: num_workers_expected + excess ==
- get_num_workers_running(gunicorn_master_proc),
- master_timeout)
+ self._wait_until_true(
+ lambda: self.num_workers_expected + excess == self._get_num_workers_running(),
+ timeout=self.master_timeout
+ )
- try:
- wait_until_true(lambda: num_workers_expected ==
- get_num_workers_running(gunicorn_master_proc),
- master_timeout)
- while True:
- num_workers_running = get_num_workers_running(gunicorn_master_proc)
- num_ready_workers_running = \
- get_num_ready_workers_running(gunicorn_master_proc)
-
- state = '[{0} / {1}]'.format(num_ready_workers_running, num_workers_running)
-
- # Whenever some workers are not ready, wait until all workers are ready
- if num_ready_workers_running < num_workers_running:
- log.debug('%s some workers are starting up, waiting...', state)
- sys.stdout.flush()
+ def _kill_old_workers(self, count):
+ """
+ Send signal to kill the worker.
+ :param count: The number of workers to kill
+ """
+ for _ in range(count):
+ count -= 1
+ # TTOU: Decrement the number of processes by one
+ self.gunicorn_master_proc.send_signal(signal.SIGTTOU)
+ self._wait_until_true(
+ lambda: self.num_workers_expected + count == self._get_num_workers_running(),
+ timeout=self.master_timeout)
+
+ def _reload_gunicorn(self):
+ """
+ Send signal to reload the gunciron configuration. When gunciorn receive signals, it reload the
+ configuration, start the new worker processes with a new configuration and gracefully
+ shutdown older workers.
+ """
+ # HUP: Reload the configuration.
+ self.gunicorn_master_proc.send_signal(signal.SIGHUP)
+ time.sleep(1)
+ self._wait_until_true(
+ lambda: self.num_workers_expected == self._get_num_workers_running(),
+ timeout=self.master_timeout
+ )
+
+ def start(self):
+ """
+ Starts monitoring the webserver.
+ """
+ try: # pylint: disable=too-many-nested-blocks
+ self._wait_until_true(
+ lambda: self.num_workers_expected == self._get_num_workers_running(),
+ timeout=self.master_timeout
+ )
+ while True:
+ if self.gunicorn_master_proc.poll() is not None:
+ sys.exit(self.gunicorn_master_proc.returncode)
+ self._check_workers()
+ # Throttle loop
time.sleep(1)
- # Kill a worker gracefully by asking gunicorn to reduce number of workers
- elif num_workers_running > num_workers_expected:
- excess = num_workers_running - num_workers_expected
- log.debug('%s killing %s workers', state, excess)
-
- for _ in range(excess):
- gunicorn_master_proc.send_signal(signal.SIGTTOU)
- excess -= 1
- wait_until_true(lambda: num_workers_expected + excess ==
- get_num_workers_running(gunicorn_master_proc),
- master_timeout)
-
- # Start a new worker by asking gunicorn to increase number of workers
- elif num_workers_running == num_workers_expected:
- refresh_interval = conf.getint('webserver', 'worker_refresh_interval')
- log.debug(
- '%s sleeping for %ss starting doing a refresh...',
- state, refresh_interval
+ except (AirflowWebServerTimeout, OSError) as err:
+ self.log.error(err)
+ self.log.error("Shutting down webserver")
+ try:
+ self.gunicorn_master_proc.terminate()
+ self.gunicorn_master_proc.wait()
+ finally:
+ sys.exit(1)
+
+ def _check_workers(self):
+ num_workers_running = self._get_num_workers_running()
+ num_ready_workers_running = self._get_num_ready_workers_running()
+
+ # Whenever some workers are not ready, wait until all workers are ready
+ if num_ready_workers_running < num_workers_running:
+ self.log.debug(
+ '[%d / %d] Some workers are starting up, waiting...',
+ num_ready_workers_running, num_workers_running
+ )
+ time.sleep(1)
+ return
+
+ # If there are too many workers, then kill a worker gracefully by asking gunicorn to reduce
+ # number of workers
+ if num_workers_running > self.num_workers_expected:
+ excess = min(num_workers_running - self.num_workers_expected, self.worker_refresh_batch_size)
+ self.log.debug(
+ '[%d / %d] Killing %s workers', num_ready_workers_running, num_workers_running, excess
+ )
+ self._kill_old_workers(excess)
+ return
+
+ # If there are too few workers, start a new worker by asking gunicorn
+ # to increase number of workers
+ if num_workers_running < self.num_workers_expected:
+ self.log.error(
+ "[%d / %d] Some workers seem to have died and gunicorn did not restart "
+ "them as expected",
+ num_ready_workers_running, num_workers_running
+ )
+ time.sleep(10)
+ num_workers_running = self._get_num_workers_running()
+ if num_workers_running < self.num_workers_expected:
+ new_worker_count = min(
+ num_workers_running - self.worker_refresh_batch_size, self.worker_refresh_batch_size
+ )
+ self.log.debug(
+ '[%d / %d] Spawning %d workers',
+ num_ready_workers_running, num_workers_running, new_worker_count
+ )
+ self._spawn_new_workers(num_workers_running)
+ return
+
+ # Now the number of running and expected worker should be equal
+
+ # If workers should be restarted periodically.
+ if self.worker_refresh_interval > 0 and self._last_refresh_time:
+ # and we refreshed the workers a long time ago, refresh the workers
+ last_refresh_diff = (time.time() - self._last_refresh_time)
+ if self.worker_refresh_interval < last_refresh_diff:
+ num_new_workers = self.worker_refresh_batch_size
+ self.log.debug(
+ '[%d / %d] Starting doing a refresh. Starting %d workers.',
+ num_ready_workers_running, num_workers_running, num_new_workers
)
- time.sleep(refresh_interval)
- start_refresh(gunicorn_master_proc)
+ self._spawn_new_workers(num_new_workers)
+ self._last_refresh_time = time.time()
+ return
- else:
- # num_ready_workers_running == num_workers_running < num_workers_expected
- log.error((
- "%s some workers seem to have died and gunicorn"
- "did not restart them as expected"
- ), state)
- time.sleep(10)
- if len(
- psutil.Process(gunicorn_master_proc.pid).children()
- ) < num_workers_expected:
- start_refresh(gunicorn_master_proc)
- except (AirflowWebServerTimeout, OSError) as err:
- log.error(err)
- log.error("Shutting down webserver")
- try:
- gunicorn_master_proc.terminate()
- gunicorn_master_proc.wait()
- finally:
- sys.exit(1)
+ # if we should check the directory with the plugin,
+ if self.reload_on_plugin_change:
+ # compare the previous and current contents of the directory
+ new_state = self._generate_plugin_state()
+ # If changed, wait until its content is fully saved.
+ if new_state != self._last_plugin_state:
+ self.log.debug(
+ '[%d / %d] Plugins folder changed. The gunicorn will be restarted the next time the '
+ 'plugin directory is checked, if there is no change in it.',
+ num_ready_workers_running, num_workers_running
+ )
+ self._restart_on_next_plugin_check = True
+ self._last_plugin_state = new_state
+ elif self._restart_on_next_plugin_check:
+ self.log.debug(
+ '[%d / %d] Starts reloading the gunicorn configuration.',
+ num_ready_workers_running, num_workers_running
+ )
+ self._restart_on_next_plugin_check = False
+ self._last_refresh_time = time.time()
+ self._reload_gunicorn()
@cli_utils.action_logging
@@ -962,13 +1096,13 @@ def webserver(args):
run_args = [
'gunicorn',
- '-w', str(num_workers),
- '-k', str(args.workerclass),
- '-t', str(worker_timeout),
- '-b', args.hostname + ':' + str(args.port),
- '-n', 'airflow-webserver',
- '-p', str(pid),
- '-c', 'python:airflow.www.gunicorn_config',
+ '--workers', str(num_workers),
+ '--worker-class', str(args.workerclass),
+ '--timeout', str(worker_timeout),
+ '--bind', args.hostname + ':' + str(args.port),
+ '--name', 'airflow-webserver',
+ '--pid', str(pid),
+ '--config', 'python:airflow.www.gunicorn_config',
]
if args.access_logfile:
@@ -978,7 +1112,7 @@ def webserver(args):
run_args += ['--error-logfile', str(args.error_logfile)]
if args.daemon:
- run_args += ['-D']
+ run_args += ['--deamon']
if ssl_cert:
run_args += ['--certfile', ssl_cert, '--keyfile', ssl_key]
@@ -995,12 +1129,14 @@ def webserver(args):
def monitor_gunicorn(gunicorn_master_proc):
# These run forever until SIG{INT, TERM, KILL, ...} signal is sent
- if conf.getint('webserver', 'worker_refresh_interval') > 0:
- master_timeout = conf.getint('webserver', 'web_server_master_timeout')
- restart_workers(gunicorn_master_proc, num_workers, master_timeout)
- else:
- while True:
- time.sleep(1)
+ GunicornMonitor(
+ gunicorn_master_proc=gunicorn_master_proc,
+ num_workers_expected=num_workers,
+ master_timeout=conf.getint('webserver', 'web_server_master_timeout'),
+ worker_refresh_interval=conf.getint('webserver', 'worker_refresh_interval', fallback=10),
+ worker_refresh_batch_size=conf.getint('webserver', 'worker_refresh_batch_size', fallback=1),
+ reload_on_plugin_change=conf.getint('webserver', 'reload_on_plugin_change', fallback=1),
+ ).start()
if args.daemon:
base, ext = os.path.splitext(pid)
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index e32b8cc..1f697b1 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -699,6 +699,14 @@
type: string
example: ~
default: "30"
+ - name: reload_on_plugin_change
+ description: |
+ If set to True, Airflow will track files in plugins_follder directory. When it detects changes,
+ then reload the gunicorn.
+ version_added: ~
+ type: boolean
+ example: ~
+ default: False
- name: secret_key
description: |
Secret key used to run your flask app
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index c75d3ae..cfc92ad 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -341,6 +341,10 @@ worker_refresh_batch_size = 1
# Number of seconds to wait before refreshing a batch of workers.
worker_refresh_interval = 30
+# If set to True, Airflow will track files in plugins_follder directory. When it detects changes,
+# then reload the gunicorn.
+reload_on_plugin_change =
+
# Secret key used to run your flask app
# It should be as random as possible
secret_key = temporary_key
diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py
index a2a81ac..5748057 100644
--- a/tests/cli/test_cli.py
+++ b/tests/cli/test_cli.py
@@ -17,6 +17,7 @@
# specific language governing permissions and limitations
# under the License.
import contextlib
+import errno
import io
import logging
@@ -28,8 +29,8 @@ from six import StringIO, PY2
import sys
from datetime import datetime, timedelta, time
-from mock import patch, Mock, MagicMock
-from time import sleep
+from mock import patch, MagicMock
+from time import sleep, time as timetime
import psutil
import pytz
import subprocess
@@ -37,9 +38,10 @@ import pytest
from argparse import Namespace
from airflow import settings
import airflow.bin.cli as cli
-from airflow.bin.cli import get_num_ready_workers_running, run, get_dag
+from airflow.bin.cli import run, get_dag
from airflow.models import TaskInstance
from airflow.utils import timezone
+from airflow.utils.file import TemporaryDirectory
from airflow.utils.state import State
from airflow.settings import Session
from airflow import models
@@ -154,39 +156,6 @@ class TestCLI(unittest.TestCase):
cls.dagbag = models.DagBag(include_examples=True)
cls.parser = cli.CLIFactory.get_parser()
- def setUp(self):
- self.gunicorn_master_proc = Mock(pid=None)
- self.children = MagicMock()
- self.child = MagicMock()
- self.process = MagicMock()
-
- def test_ready_prefix_on_cmdline(self):
- self.child.cmdline.return_value = [settings.GUNICORN_WORKER_READY_PREFIX]
- self.process.children.return_value = [self.child]
-
- with patch('psutil.Process', return_value=self.process):
- self.assertEqual(get_num_ready_workers_running(self.gunicorn_master_proc), 1)
-
- def test_ready_prefix_on_cmdline_no_children(self):
- self.process.children.return_value = []
-
- with patch('psutil.Process', return_value=self.process):
- self.assertEqual(get_num_ready_workers_running(self.gunicorn_master_proc), 0)
-
- def test_ready_prefix_on_cmdline_zombie(self):
- self.child.cmdline.return_value = []
- self.process.children.return_value = [self.child]
-
- with patch('psutil.Process', return_value=self.process):
- self.assertEqual(get_num_ready_workers_running(self.gunicorn_master_proc), 0)
-
- def test_ready_prefix_on_cmdline_dead_process(self):
- self.child.cmdline.side_effect = psutil.NoSuchProcess(11347)
- self.process.children.return_value = [self.child]
-
- with patch('psutil.Process', return_value=self.process):
- self.assertEqual(get_num_ready_workers_running(self.gunicorn_master_proc), 0)
-
def test_cli_webserver_debug(self):
env = os.environ.copy()
p = psutil.Popen(["airflow", "webserver", "-d"], env=env)
@@ -847,3 +816,195 @@ class TestShowInfo(unittest.TestCase):
self.assertIn("https://file.io/TEST", temp_stdout.getvalue())
content = mock_requests.post.call_args[1]["files"]["file"][1]
self.assertIn("postgresql+psycopg2://p...s:PASSWORD@postgres/airflow", content)
+
+
+class TestGunicornMonitor(unittest.TestCase):
+
+ def setUp(self):
+ self.gunicorn_master_proc = mock.Mock(pid=2137)
+ self.monitor = cli.GunicornMonitor(
+ gunicorn_master_proc=self.gunicorn_master_proc,
+ num_workers_expected=4,
+ master_timeout=60,
+ worker_refresh_interval=60,
+ worker_refresh_batch_size=2,
+ reload_on_plugin_change=True,
+ )
+ mock.patch.object(self.monitor, '_generate_plugin_state', return_value={}).start()
+ mock.patch.object(self.monitor, '_get_num_ready_workers_running', return_value=4).start()
+ mock.patch.object(self.monitor, '_get_num_workers_running', return_value=4).start()
+ mock.patch.object(self.monitor, '_spawn_new_workers', return_value=None).start()
+ mock.patch.object(self.monitor, '_kill_old_workers', return_value=None).start()
+ mock.patch.object(self.monitor, '_reload_gunicorn', return_value=None).start()
+
+ @mock.patch('airflow.bin.cli.time.sleep')
+ def test_should_wait_for_workers_to_start(self, mock_sleep):
+ self.monitor._get_num_ready_workers_running.return_value = 0
+ self.monitor._get_num_workers_running.return_value = 4
+ self.monitor._check_workers()
+ self.monitor._spawn_new_workers.assert_not_called() # pylint: disable=no-member
+ self.monitor._kill_old_workers.assert_not_called() # pylint: disable=no-member
+ self.monitor._reload_gunicorn.assert_not_called() # pylint: disable=no-member
+
+ @mock.patch('airflow.bin.cli.time.sleep')
+ def test_should_kill_excess_workers(self, mock_sleep):
+ self.monitor._get_num_ready_workers_running.return_value = 10
+ self.monitor._get_num_workers_running.return_value = 10
+ self.monitor._check_workers()
+ self.monitor._spawn_new_workers.assert_not_called() # pylint: disable=no-member
+ self.monitor._kill_old_workers.assert_called_once_with(2) # pylint: disable=no-member
+ self.monitor._reload_gunicorn.assert_not_called() # pylint: disable=no-member
+
+ @mock.patch('airflow.bin.cli.time.sleep')
+ def test_should_start_new_workers_when_missing(self, mock_sleep):
+ self.monitor._get_num_ready_workers_running.return_value = 2
+ self.monitor._get_num_workers_running.return_value = 2
+ self.monitor._check_workers()
+ self.monitor._spawn_new_workers.assert_called_once_with(2) # pylint: disable=no-member
+ self.monitor._kill_old_workers.assert_not_called() # pylint: disable=no-member
+ self.monitor._reload_gunicorn.assert_not_called() # pylint: disable=no-member
+
+ @mock.patch('airflow.bin.cli.time.sleep')
+ def test_should_start_new_workers_when_refresh_interval_has_passed(self, mock_sleep):
+ self.monitor._last_refresh_time -= 200
+ self.monitor._check_workers()
+ self.monitor._spawn_new_workers.assert_called_once_with(2) # pylint: disable=no-member
+ self.monitor._kill_old_workers.assert_not_called() # pylint: disable=no-member
+ self.monitor._reload_gunicorn.assert_not_called() # pylint: disable=no-member
+ self.assertAlmostEqual(self.monitor._last_refresh_time, timetime(), delta=5)
+
+ @mock.patch('airflow.bin.cli.time.sleep')
+ def test_should_reload_when_plugin_has_been_changed(self, mock_sleep):
+ self.monitor._generate_plugin_state.return_value = {'AA': 12}
+
+ self.monitor._check_workers()
+
+ self.monitor._spawn_new_workers.assert_not_called() # pylint: disable=no-member
+ self.monitor._kill_old_workers.assert_not_called() # pylint: disable=no-member
+ self.monitor._reload_gunicorn.assert_not_called() # pylint: disable=no-member
+
+ self.monitor._generate_plugin_state.return_value = {'AA': 32}
+
+ self.monitor._check_workers()
+
+ self.monitor._spawn_new_workers.assert_not_called() # pylint: disable=no-member
+ self.monitor._kill_old_workers.assert_not_called() # pylint: disable=no-member
+ self.monitor._reload_gunicorn.assert_not_called() # pylint: disable=no-member
+
+ self.monitor._generate_plugin_state.return_value = {'AA': 32}
+
+ self.monitor._check_workers()
+
+ self.monitor._spawn_new_workers.assert_not_called() # pylint: disable=no-member
+ self.monitor._kill_old_workers.assert_not_called() # pylint: disable=no-member
+ self.monitor._reload_gunicorn.assert_called_once_with() # pylint: disable=no-member
+ self.assertAlmostEqual(self.monitor._last_refresh_time, timetime(), delta=5)
+
+
+class TestGunicornMonitorGeneratePluginState(unittest.TestCase):
+ @staticmethod
+ def _prepare_test_file(filepath, size):
+ try:
+ os.makedirs(os.path.dirname(filepath))
+ except OSError as e:
+ # be happy if someone already created the path
+ if e.errno != errno.EEXIST:
+ raise
+ with open(filepath, "w") as file:
+ file.write("A" * size)
+ file.flush()
+
+ def test_should_detect_changes_in_directory(self):
+ with TemporaryDirectory(prefix="tmp") as tempdir, \
+ mock.patch("airflow.bin.cli.settings.PLUGINS_FOLDER", tempdir):
+ self._prepare_test_file("{}/file1.txt".format(tempdir), 100)
+ self._prepare_test_file("{}/nested/nested/nested/nested/file2.txt".format(tempdir), 200)
+ self._prepare_test_file("{}/file3.txt".format(tempdir), 300)
+
+ monitor = cli.GunicornMonitor(
+ gunicorn_master_proc=mock.MagicMock(),
+ num_workers_expected=4,
+ master_timeout=60,
+ worker_refresh_interval=60,
+ worker_refresh_batch_size=2,
+ reload_on_plugin_change=True,
+ )
+
+ # When the files have not changed, the result should be constant
+ state_a = monitor._generate_plugin_state()
+ state_b = monitor._generate_plugin_state()
+
+ self.assertEqual(state_a, state_b)
+ self.assertEqual(3, len(state_a))
+
+ # Should detect new file
+ self._prepare_test_file("{}/file4.txt".format(tempdir), 400)
+
+ state_c = monitor._generate_plugin_state()
+
+ self.assertNotEqual(state_b, state_c)
+ self.assertEqual(4, len(state_c))
+
+ # Should detect changes in files
+ self._prepare_test_file("{}/file4.txt".format(tempdir), 450)
+
+ state_d = monitor._generate_plugin_state()
+
+ self.assertNotEqual(state_c, state_d)
+ self.assertEqual(4, len(state_d))
+
+ # Should support large files
+ self._prepare_test_file("{}/file4.txt".format(tempdir), 4000000)
+
+ state_d = monitor._generate_plugin_state()
+
+ self.assertNotEqual(state_c, state_d)
+ self.assertEqual(4, len(state_d))
+
+
+class TestCLIGetNumReadyWorkersRunning(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ cls.parser = cli.get_parser()
+
+ def setUp(self):
+ self.gunicorn_master_proc = mock.Mock(pid=2137)
+ self.children = mock.MagicMock()
+ self.child = mock.MagicMock()
+ self.process = mock.MagicMock()
+ self.monitor = cli.GunicornMonitor(
+ gunicorn_master_proc=self.gunicorn_master_proc,
+ num_workers_expected=4,
+ master_timeout=60,
+ worker_refresh_interval=60,
+ worker_refresh_batch_size=2,
+ reload_on_plugin_change=True,
+ )
+
+ def test_ready_prefix_on_cmdline(self):
+ self.child.cmdline.return_value = [settings.GUNICORN_WORKER_READY_PREFIX]
+ self.process.children.return_value = [self.child]
+
+ with mock.patch('psutil.Process', return_value=self.process):
+ self.assertEqual(self.monitor._get_num_ready_workers_running(), 1)
+
+ def test_ready_prefix_on_cmdline_no_children(self):
+ self.process.children.return_value = []
+
+ with mock.patch('psutil.Process', return_value=self.process):
+ self.assertEqual(self.monitor._get_num_ready_workers_running(), 0)
+
+ def test_ready_prefix_on_cmdline_zombie(self):
+ self.child.cmdline.return_value = []
+ self.process.children.return_value = [self.child]
+
+ with mock.patch('psutil.Process', return_value=self.process):
+ self.assertEqual(self.monitor._get_num_ready_workers_running(), 0)
+
+ def test_ready_prefix_on_cmdline_dead_process(self):
+ self.child.cmdline.side_effect = psutil.NoSuchProcess(11347)
+ self.process.children.return_value = [self.child]
+
+ with mock.patch('psutil.Process', return_value=self.process):
+ self.assertEqual(self.monitor._get_num_ready_workers_running(), 0)