You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/06/30 10:48:22 UTC

[airflow] 02/05: Reload gunicorn when plugins has beeen changed (#8997)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 36f79a9b227992f2efd6dca0ae7d4ad15dc8d2e6
Author: Kamil BreguĊ‚a <ka...@polidea.com>
AuthorDate: Mon Jun 29 20:38:26 2020 +0200

    Reload gunicorn when plugins has beeen changed (#8997)
    
    (cherry picked from commit 1c48ffbe25c3e304660b7e75a49e88bd114dde46)
---
 airflow/bin/cli.py                           | 350 +++++++++++++++++++--------
 airflow/config_templates/config.yml          |   8 +
 airflow/config_templates/default_airflow.cfg |   4 +
 tests/cli/test_cli.py                        | 233 +++++++++++++++---
 4 files changed, 452 insertions(+), 143 deletions(-)

diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 8b5522c..b81c7b1 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -20,6 +20,7 @@
 
 from __future__ import print_function
 import errno
+import hashlib
 import importlib
 import locale
 import logging
@@ -777,31 +778,11 @@ def clear(args):
     )
 
 
-def get_num_ready_workers_running(gunicorn_master_proc):
-    workers = psutil.Process(gunicorn_master_proc.pid).children()
-
-    def ready_prefix_on_cmdline(proc):
-        try:
-            cmdline = proc.cmdline()
-            if len(cmdline) > 0:
-                return settings.GUNICORN_WORKER_READY_PREFIX in cmdline[0]
-        except psutil.NoSuchProcess:
-            pass
-        return False
-
-    ready_workers = [proc for proc in workers if ready_prefix_on_cmdline(proc)]
-    return len(ready_workers)
-
-
-def get_num_workers_running(gunicorn_master_proc):
-    workers = psutil.Process(gunicorn_master_proc.pid).children()
-    return len(workers)
-
-
-def restart_workers(gunicorn_master_proc, num_workers_expected, master_timeout):
+class GunicornMonitor(LoggingMixin):
     """
     Runs forever, monitoring the child processes of @gunicorn_master_proc and
-    restarting workers occasionally.
+    restarting workers occasionally or when files in the plug-in directory
+    has been modified.
     Each iteration of the loop traverses one edge of this state transition
     diagram, where each state (node) represents
     [ num_ready_workers_running / num_workers_running ]. We expect most time to
@@ -818,92 +799,245 @@ def restart_workers(gunicorn_master_proc, num_workers_expected, master_timeout):
     master process, which increases and decreases the number of child workers
     respectively. Gunicorn guarantees that on TTOU workers are terminated
     gracefully and that the oldest worker is terminated.
+
+    :param gunicorn_master_proc:  handle for the main Gunicorn process
+    :param num_workers_expected:  Number of workers to run the Gunicorn web server
+    :param master_timeout: Number of seconds the webserver waits before killing gunicorn master that
+        doesn't respond
+    :param worker_refresh_interval: Number of seconds to wait before refreshing a batch of workers.
+    :param worker_refresh_batch_size: Number of workers to refresh at a time. When set to 0, worker
+        refresh is disabled. When nonzero, airflow periodically refreshes webserver workers by
+        bringing up new ones and killing old ones.
+    :param reload_on_plugin_change: If set to True, Airflow will track files in plugins_follder directory.
+        When it detects changes, then reload the gunicorn.
     """
+    def __init__(
+        self,
+        gunicorn_master_proc,
+        num_workers_expected,
+        master_timeout,
+        worker_refresh_interval,
+        worker_refresh_batch_size,
+        reload_on_plugin_change
+    ):
+        super(GunicornMonitor, self).__init__()
+        self.gunicorn_master_proc = gunicorn_master_proc
+        self.num_workers_expected = num_workers_expected
+        self.master_timeout = master_timeout
+        self.worker_refresh_interval = worker_refresh_interval
+        self.worker_refresh_batch_size = worker_refresh_batch_size
+        self.reload_on_plugin_change = reload_on_plugin_change
+
+        self._num_workers_running = 0
+        self._num_ready_workers_running = 0
+        self._last_refresh_time = time.time() if worker_refresh_interval > 0 else None
+        self._last_plugin_state = self._generate_plugin_state() if reload_on_plugin_change else None
+        self._restart_on_next_plugin_check = False
+
+    def _generate_plugin_state(self):
+        """
+        Generate dict of filenames and last modification time of all files in settings.PLUGINS_FOLDER
+        directory.
+        """
+        if not settings.PLUGINS_FOLDER:
+            return {}
+
+        all_filenames = []
+        for (root, _, filenames) in os.walk(settings.PLUGINS_FOLDER):
+            all_filenames.extend(os.path.join(root, f) for f in filenames)
+        plugin_state = {f: self._get_file_hash(f) for f in sorted(all_filenames)}
+        return plugin_state
+
+    @staticmethod
+    def _get_file_hash(fname):
+        """Calculate MD5 hash for file"""
+        hash_md5 = hashlib.md5()
+        with open(fname, "rb") as f:
+            for chunk in iter(lambda: f.read(4096), b""):
+                hash_md5.update(chunk)
+        return hash_md5.hexdigest()
+
+    def _get_num_ready_workers_running(self):
+        """Returns number of ready Gunicorn workers by looking for READY_PREFIX in process name"""
+        workers = psutil.Process(self.gunicorn_master_proc.pid).children()
+
+        def ready_prefix_on_cmdline(proc):
+            try:
+                cmdline = proc.cmdline()
+                if len(cmdline) > 0:  # pylint: disable=len-as-condition
+                    return settings.GUNICORN_WORKER_READY_PREFIX in cmdline[0]
+            except psutil.NoSuchProcess:
+                pass
+            return False
+
+        ready_workers = [proc for proc in workers if ready_prefix_on_cmdline(proc)]
+        return len(ready_workers)
 
-    def wait_until_true(fn, timeout=0):
+    def _get_num_workers_running(self):
+        """Returns number of running Gunicorn workers processes"""
+        workers = psutil.Process(self.gunicorn_master_proc.pid).children()
+        return len(workers)
+
+    def _wait_until_true(self, fn, timeout=0):
         """
         Sleeps until fn is true
         """
-        t = time.time()
+        start_time = time.time()
         while not fn():
-            if 0 < timeout and timeout <= time.time() - t:
+            if 0 < timeout <= time.time() - start_time:
                 raise AirflowWebServerTimeout(
-                    "No response from gunicorn master within {0} seconds"
-                    .format(timeout))
+                    "No response from gunicorn master within {0} seconds".format(timeout)
+                )
             time.sleep(0.1)
 
-    def start_refresh(gunicorn_master_proc):
-        batch_size = conf.getint('webserver', 'worker_refresh_batch_size')
-        log.debug('%s doing a refresh of %s workers', state, batch_size)
-        sys.stdout.flush()
-        sys.stderr.flush()
-
+    def _spawn_new_workers(self, count):
+        """
+        Send signal to kill the worker.
+        :param count: The number of workers to spawn
+        """
         excess = 0
-        for _ in range(batch_size):
-            gunicorn_master_proc.send_signal(signal.SIGTTIN)
+        for _ in range(count):
+            # TTIN: Increment the number of processes by one
+            self.gunicorn_master_proc.send_signal(signal.SIGTTIN)
             excess += 1
-            wait_until_true(lambda: num_workers_expected + excess ==
-                            get_num_workers_running(gunicorn_master_proc),
-                            master_timeout)
+            self._wait_until_true(
+                lambda: self.num_workers_expected + excess == self._get_num_workers_running(),
+                timeout=self.master_timeout
+            )
 
-    try:
-        wait_until_true(lambda: num_workers_expected ==
-                        get_num_workers_running(gunicorn_master_proc),
-                        master_timeout)
-        while True:
-            num_workers_running = get_num_workers_running(gunicorn_master_proc)
-            num_ready_workers_running = \
-                get_num_ready_workers_running(gunicorn_master_proc)
-
-            state = '[{0} / {1}]'.format(num_ready_workers_running, num_workers_running)
-
-            # Whenever some workers are not ready, wait until all workers are ready
-            if num_ready_workers_running < num_workers_running:
-                log.debug('%s some workers are starting up, waiting...', state)
-                sys.stdout.flush()
+    def _kill_old_workers(self, count):
+        """
+        Send signal to kill the worker.
+        :param count: The number of workers to kill
+        """
+        for _ in range(count):
+            count -= 1
+            # TTOU: Decrement the number of processes by one
+            self.gunicorn_master_proc.send_signal(signal.SIGTTOU)
+            self._wait_until_true(
+                lambda: self.num_workers_expected + count == self._get_num_workers_running(),
+                timeout=self.master_timeout)
+
+    def _reload_gunicorn(self):
+        """
+        Send signal to reload the gunciron configuration. When gunciorn receive signals, it reload the
+        configuration, start the new worker processes with a new configuration and gracefully
+        shutdown older workers.
+        """
+        # HUP: Reload the configuration.
+        self.gunicorn_master_proc.send_signal(signal.SIGHUP)
+        time.sleep(1)
+        self._wait_until_true(
+            lambda: self.num_workers_expected == self._get_num_workers_running(),
+            timeout=self.master_timeout
+        )
+
+    def start(self):
+        """
+        Starts monitoring the webserver.
+        """
+        try:  # pylint: disable=too-many-nested-blocks
+            self._wait_until_true(
+                lambda: self.num_workers_expected == self._get_num_workers_running(),
+                timeout=self.master_timeout
+            )
+            while True:
+                if self.gunicorn_master_proc.poll() is not None:
+                    sys.exit(self.gunicorn_master_proc.returncode)
+                self._check_workers()
+                # Throttle loop
                 time.sleep(1)
 
-            # Kill a worker gracefully by asking gunicorn to reduce number of workers
-            elif num_workers_running > num_workers_expected:
-                excess = num_workers_running - num_workers_expected
-                log.debug('%s killing %s workers', state, excess)
-
-                for _ in range(excess):
-                    gunicorn_master_proc.send_signal(signal.SIGTTOU)
-                    excess -= 1
-                    wait_until_true(lambda: num_workers_expected + excess ==
-                                    get_num_workers_running(gunicorn_master_proc),
-                                    master_timeout)
-
-            # Start a new worker by asking gunicorn to increase number of workers
-            elif num_workers_running == num_workers_expected:
-                refresh_interval = conf.getint('webserver', 'worker_refresh_interval')
-                log.debug(
-                    '%s sleeping for %ss starting doing a refresh...',
-                    state, refresh_interval
+        except (AirflowWebServerTimeout, OSError) as err:
+            self.log.error(err)
+            self.log.error("Shutting down webserver")
+            try:
+                self.gunicorn_master_proc.terminate()
+                self.gunicorn_master_proc.wait()
+            finally:
+                sys.exit(1)
+
+    def _check_workers(self):
+        num_workers_running = self._get_num_workers_running()
+        num_ready_workers_running = self._get_num_ready_workers_running()
+
+        # Whenever some workers are not ready, wait until all workers are ready
+        if num_ready_workers_running < num_workers_running:
+            self.log.debug(
+                '[%d / %d] Some workers are starting up, waiting...',
+                num_ready_workers_running, num_workers_running
+            )
+            time.sleep(1)
+            return
+
+        # If there are too many workers, then kill a worker gracefully by asking gunicorn to reduce
+        # number of workers
+        if num_workers_running > self.num_workers_expected:
+            excess = min(num_workers_running - self.num_workers_expected, self.worker_refresh_batch_size)
+            self.log.debug(
+                '[%d / %d] Killing %s workers', num_ready_workers_running, num_workers_running, excess
+            )
+            self._kill_old_workers(excess)
+            return
+
+        # If there are too few workers, start a new worker by asking gunicorn
+        # to increase number of workers
+        if num_workers_running < self.num_workers_expected:
+            self.log.error(
+                "[%d / %d] Some workers seem to have died and gunicorn did not restart "
+                "them as expected",
+                num_ready_workers_running, num_workers_running
+            )
+            time.sleep(10)
+            num_workers_running = self._get_num_workers_running()
+            if num_workers_running < self.num_workers_expected:
+                new_worker_count = min(
+                    num_workers_running - self.worker_refresh_batch_size, self.worker_refresh_batch_size
+                )
+                self.log.debug(
+                    '[%d / %d] Spawning %d workers',
+                    num_ready_workers_running, num_workers_running, new_worker_count
+                )
+                self._spawn_new_workers(num_workers_running)
+            return
+
+        # Now the number of running and expected worker should be equal
+
+        # If workers should be restarted periodically.
+        if self.worker_refresh_interval > 0 and self._last_refresh_time:
+            # and we refreshed the workers a long time ago, refresh the workers
+            last_refresh_diff = (time.time() - self._last_refresh_time)
+            if self.worker_refresh_interval < last_refresh_diff:
+                num_new_workers = self.worker_refresh_batch_size
+                self.log.debug(
+                    '[%d / %d] Starting doing a refresh. Starting %d workers.',
+                    num_ready_workers_running, num_workers_running, num_new_workers
                 )
-                time.sleep(refresh_interval)
-                start_refresh(gunicorn_master_proc)
+                self._spawn_new_workers(num_new_workers)
+                self._last_refresh_time = time.time()
+                return
 
-            else:
-                # num_ready_workers_running == num_workers_running < num_workers_expected
-                log.error((
-                    "%s some workers seem to have died and gunicorn"
-                    "did not restart them as expected"
-                ), state)
-                time.sleep(10)
-                if len(
-                    psutil.Process(gunicorn_master_proc.pid).children()
-                ) < num_workers_expected:
-                    start_refresh(gunicorn_master_proc)
-    except (AirflowWebServerTimeout, OSError) as err:
-        log.error(err)
-        log.error("Shutting down webserver")
-        try:
-            gunicorn_master_proc.terminate()
-            gunicorn_master_proc.wait()
-        finally:
-            sys.exit(1)
+        # if we should check the directory with the plugin,
+        if self.reload_on_plugin_change:
+            # compare the previous and current contents of the directory
+            new_state = self._generate_plugin_state()
+            # If changed, wait until its content is fully saved.
+            if new_state != self._last_plugin_state:
+                self.log.debug(
+                    '[%d / %d] Plugins folder changed. The gunicorn will be restarted the next time the '
+                    'plugin directory is checked, if there is no change in it.',
+                    num_ready_workers_running, num_workers_running
+                )
+                self._restart_on_next_plugin_check = True
+                self._last_plugin_state = new_state
+            elif self._restart_on_next_plugin_check:
+                self.log.debug(
+                    '[%d / %d] Starts reloading the gunicorn configuration.',
+                    num_ready_workers_running, num_workers_running
+                )
+                self._restart_on_next_plugin_check = False
+                self._last_refresh_time = time.time()
+                self._reload_gunicorn()
 
 
 @cli_utils.action_logging
@@ -962,13 +1096,13 @@ def webserver(args):
 
         run_args = [
             'gunicorn',
-            '-w', str(num_workers),
-            '-k', str(args.workerclass),
-            '-t', str(worker_timeout),
-            '-b', args.hostname + ':' + str(args.port),
-            '-n', 'airflow-webserver',
-            '-p', str(pid),
-            '-c', 'python:airflow.www.gunicorn_config',
+            '--workers', str(num_workers),
+            '--worker-class', str(args.workerclass),
+            '--timeout', str(worker_timeout),
+            '--bind', args.hostname + ':' + str(args.port),
+            '--name', 'airflow-webserver',
+            '--pid', str(pid),
+            '--config', 'python:airflow.www.gunicorn_config',
         ]
 
         if args.access_logfile:
@@ -978,7 +1112,7 @@ def webserver(args):
             run_args += ['--error-logfile', str(args.error_logfile)]
 
         if args.daemon:
-            run_args += ['-D']
+            run_args += ['--deamon']
 
         if ssl_cert:
             run_args += ['--certfile', ssl_cert, '--keyfile', ssl_key]
@@ -995,12 +1129,14 @@ def webserver(args):
 
         def monitor_gunicorn(gunicorn_master_proc):
             # These run forever until SIG{INT, TERM, KILL, ...} signal is sent
-            if conf.getint('webserver', 'worker_refresh_interval') > 0:
-                master_timeout = conf.getint('webserver', 'web_server_master_timeout')
-                restart_workers(gunicorn_master_proc, num_workers, master_timeout)
-            else:
-                while True:
-                    time.sleep(1)
+            GunicornMonitor(
+                gunicorn_master_proc=gunicorn_master_proc,
+                num_workers_expected=num_workers,
+                master_timeout=conf.getint('webserver', 'web_server_master_timeout'),
+                worker_refresh_interval=conf.getint('webserver', 'worker_refresh_interval', fallback=10),
+                worker_refresh_batch_size=conf.getint('webserver', 'worker_refresh_batch_size', fallback=1),
+                reload_on_plugin_change=conf.getint('webserver', 'reload_on_plugin_change', fallback=1),
+            ).start()
 
         if args.daemon:
             base, ext = os.path.splitext(pid)
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index e32b8cc..1f697b1 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -699,6 +699,14 @@
       type: string
       example: ~
       default: "30"
+    - name: reload_on_plugin_change
+      description: |
+        If set to True, Airflow will track files in plugins_follder directory. When it detects changes,
+        then reload the gunicorn.
+      version_added: ~
+      type: boolean
+      example: ~
+      default: False
     - name: secret_key
       description: |
         Secret key used to run your flask app
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index c75d3ae..cfc92ad 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -341,6 +341,10 @@ worker_refresh_batch_size = 1
 # Number of seconds to wait before refreshing a batch of workers.
 worker_refresh_interval = 30
 
+# If set to True, Airflow will track files in plugins_follder directory. When it detects changes,
+# then reload the gunicorn.
+reload_on_plugin_change =
+
 # Secret key used to run your flask app
 # It should be as random as possible
 secret_key = temporary_key
diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py
index a2a81ac..5748057 100644
--- a/tests/cli/test_cli.py
+++ b/tests/cli/test_cli.py
@@ -17,6 +17,7 @@
 # specific language governing permissions and limitations
 # under the License.
 import contextlib
+import errno
 import io
 
 import logging
@@ -28,8 +29,8 @@ from six import StringIO, PY2
 import sys
 
 from datetime import datetime, timedelta, time
-from mock import patch, Mock, MagicMock
-from time import sleep
+from mock import patch, MagicMock
+from time import sleep, time as timetime
 import psutil
 import pytz
 import subprocess
@@ -37,9 +38,10 @@ import pytest
 from argparse import Namespace
 from airflow import settings
 import airflow.bin.cli as cli
-from airflow.bin.cli import get_num_ready_workers_running, run, get_dag
+from airflow.bin.cli import run, get_dag
 from airflow.models import TaskInstance
 from airflow.utils import timezone
+from airflow.utils.file import TemporaryDirectory
 from airflow.utils.state import State
 from airflow.settings import Session
 from airflow import models
@@ -154,39 +156,6 @@ class TestCLI(unittest.TestCase):
         cls.dagbag = models.DagBag(include_examples=True)
         cls.parser = cli.CLIFactory.get_parser()
 
-    def setUp(self):
-        self.gunicorn_master_proc = Mock(pid=None)
-        self.children = MagicMock()
-        self.child = MagicMock()
-        self.process = MagicMock()
-
-    def test_ready_prefix_on_cmdline(self):
-        self.child.cmdline.return_value = [settings.GUNICORN_WORKER_READY_PREFIX]
-        self.process.children.return_value = [self.child]
-
-        with patch('psutil.Process', return_value=self.process):
-            self.assertEqual(get_num_ready_workers_running(self.gunicorn_master_proc), 1)
-
-    def test_ready_prefix_on_cmdline_no_children(self):
-        self.process.children.return_value = []
-
-        with patch('psutil.Process', return_value=self.process):
-            self.assertEqual(get_num_ready_workers_running(self.gunicorn_master_proc), 0)
-
-    def test_ready_prefix_on_cmdline_zombie(self):
-        self.child.cmdline.return_value = []
-        self.process.children.return_value = [self.child]
-
-        with patch('psutil.Process', return_value=self.process):
-            self.assertEqual(get_num_ready_workers_running(self.gunicorn_master_proc), 0)
-
-    def test_ready_prefix_on_cmdline_dead_process(self):
-        self.child.cmdline.side_effect = psutil.NoSuchProcess(11347)
-        self.process.children.return_value = [self.child]
-
-        with patch('psutil.Process', return_value=self.process):
-            self.assertEqual(get_num_ready_workers_running(self.gunicorn_master_proc), 0)
-
     def test_cli_webserver_debug(self):
         env = os.environ.copy()
         p = psutil.Popen(["airflow", "webserver", "-d"], env=env)
@@ -847,3 +816,195 @@ class TestShowInfo(unittest.TestCase):
         self.assertIn("https://file.io/TEST", temp_stdout.getvalue())
         content = mock_requests.post.call_args[1]["files"]["file"][1]
         self.assertIn("postgresql+psycopg2://p...s:PASSWORD@postgres/airflow", content)
+
+
+class TestGunicornMonitor(unittest.TestCase):
+
+    def setUp(self):
+        self.gunicorn_master_proc = mock.Mock(pid=2137)
+        self.monitor = cli.GunicornMonitor(
+            gunicorn_master_proc=self.gunicorn_master_proc,
+            num_workers_expected=4,
+            master_timeout=60,
+            worker_refresh_interval=60,
+            worker_refresh_batch_size=2,
+            reload_on_plugin_change=True,
+        )
+        mock.patch.object(self.monitor, '_generate_plugin_state', return_value={}).start()
+        mock.patch.object(self.monitor, '_get_num_ready_workers_running', return_value=4).start()
+        mock.patch.object(self.monitor, '_get_num_workers_running', return_value=4).start()
+        mock.patch.object(self.monitor, '_spawn_new_workers', return_value=None).start()
+        mock.patch.object(self.monitor, '_kill_old_workers', return_value=None).start()
+        mock.patch.object(self.monitor, '_reload_gunicorn', return_value=None).start()
+
+    @mock.patch('airflow.bin.cli.time.sleep')
+    def test_should_wait_for_workers_to_start(self, mock_sleep):
+        self.monitor._get_num_ready_workers_running.return_value = 0
+        self.monitor._get_num_workers_running.return_value = 4
+        self.monitor._check_workers()
+        self.monitor._spawn_new_workers.assert_not_called()  # pylint: disable=no-member
+        self.monitor._kill_old_workers.assert_not_called()  # pylint: disable=no-member
+        self.monitor._reload_gunicorn.assert_not_called()  # pylint: disable=no-member
+
+    @mock.patch('airflow.bin.cli.time.sleep')
+    def test_should_kill_excess_workers(self, mock_sleep):
+        self.monitor._get_num_ready_workers_running.return_value = 10
+        self.monitor._get_num_workers_running.return_value = 10
+        self.monitor._check_workers()
+        self.monitor._spawn_new_workers.assert_not_called()  # pylint: disable=no-member
+        self.monitor._kill_old_workers.assert_called_once_with(2)  # pylint: disable=no-member
+        self.monitor._reload_gunicorn.assert_not_called()  # pylint: disable=no-member
+
+    @mock.patch('airflow.bin.cli.time.sleep')
+    def test_should_start_new_workers_when_missing(self, mock_sleep):
+        self.monitor._get_num_ready_workers_running.return_value = 2
+        self.monitor._get_num_workers_running.return_value = 2
+        self.monitor._check_workers()
+        self.monitor._spawn_new_workers.assert_called_once_with(2)  # pylint: disable=no-member
+        self.monitor._kill_old_workers.assert_not_called()  # pylint: disable=no-member
+        self.monitor._reload_gunicorn.assert_not_called()  # pylint: disable=no-member
+
+    @mock.patch('airflow.bin.cli.time.sleep')
+    def test_should_start_new_workers_when_refresh_interval_has_passed(self, mock_sleep):
+        self.monitor._last_refresh_time -= 200
+        self.monitor._check_workers()
+        self.monitor._spawn_new_workers.assert_called_once_with(2)  # pylint: disable=no-member
+        self.monitor._kill_old_workers.assert_not_called()  # pylint: disable=no-member
+        self.monitor._reload_gunicorn.assert_not_called()  # pylint: disable=no-member
+        self.assertAlmostEqual(self.monitor._last_refresh_time, timetime(), delta=5)
+
+    @mock.patch('airflow.bin.cli.time.sleep')
+    def test_should_reload_when_plugin_has_been_changed(self, mock_sleep):
+        self.monitor._generate_plugin_state.return_value = {'AA': 12}
+
+        self.monitor._check_workers()
+
+        self.monitor._spawn_new_workers.assert_not_called()  # pylint: disable=no-member
+        self.monitor._kill_old_workers.assert_not_called()  # pylint: disable=no-member
+        self.monitor._reload_gunicorn.assert_not_called()  # pylint: disable=no-member
+
+        self.monitor._generate_plugin_state.return_value = {'AA': 32}
+
+        self.monitor._check_workers()
+
+        self.monitor._spawn_new_workers.assert_not_called()  # pylint: disable=no-member
+        self.monitor._kill_old_workers.assert_not_called()  # pylint: disable=no-member
+        self.monitor._reload_gunicorn.assert_not_called()  # pylint: disable=no-member
+
+        self.monitor._generate_plugin_state.return_value = {'AA': 32}
+
+        self.monitor._check_workers()
+
+        self.monitor._spawn_new_workers.assert_not_called()  # pylint: disable=no-member
+        self.monitor._kill_old_workers.assert_not_called()  # pylint: disable=no-member
+        self.monitor._reload_gunicorn.assert_called_once_with()  # pylint: disable=no-member
+        self.assertAlmostEqual(self.monitor._last_refresh_time, timetime(), delta=5)
+
+
+class TestGunicornMonitorGeneratePluginState(unittest.TestCase):
+    @staticmethod
+    def _prepare_test_file(filepath, size):
+        try:
+            os.makedirs(os.path.dirname(filepath))
+        except OSError as e:
+            # be happy if someone already created the path
+            if e.errno != errno.EEXIST:
+                raise
+        with open(filepath, "w") as file:
+            file.write("A" * size)
+            file.flush()
+
+    def test_should_detect_changes_in_directory(self):
+        with TemporaryDirectory(prefix="tmp") as tempdir, \
+                mock.patch("airflow.bin.cli.settings.PLUGINS_FOLDER", tempdir):
+            self._prepare_test_file("{}/file1.txt".format(tempdir), 100)
+            self._prepare_test_file("{}/nested/nested/nested/nested/file2.txt".format(tempdir), 200)
+            self._prepare_test_file("{}/file3.txt".format(tempdir), 300)
+
+            monitor = cli.GunicornMonitor(
+                gunicorn_master_proc=mock.MagicMock(),
+                num_workers_expected=4,
+                master_timeout=60,
+                worker_refresh_interval=60,
+                worker_refresh_batch_size=2,
+                reload_on_plugin_change=True,
+            )
+
+            # When the files have not changed, the result should be constant
+            state_a = monitor._generate_plugin_state()
+            state_b = monitor._generate_plugin_state()
+
+            self.assertEqual(state_a, state_b)
+            self.assertEqual(3, len(state_a))
+
+            # Should detect new file
+            self._prepare_test_file("{}/file4.txt".format(tempdir), 400)
+
+            state_c = monitor._generate_plugin_state()
+
+            self.assertNotEqual(state_b, state_c)
+            self.assertEqual(4, len(state_c))
+
+            # Should detect changes in files
+            self._prepare_test_file("{}/file4.txt".format(tempdir), 450)
+
+            state_d = monitor._generate_plugin_state()
+
+            self.assertNotEqual(state_c, state_d)
+            self.assertEqual(4, len(state_d))
+
+            # Should support large files
+            self._prepare_test_file("{}/file4.txt".format(tempdir), 4000000)
+
+            state_d = monitor._generate_plugin_state()
+
+            self.assertNotEqual(state_c, state_d)
+            self.assertEqual(4, len(state_d))
+
+
+class TestCLIGetNumReadyWorkersRunning(unittest.TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        cls.parser = cli.get_parser()
+
+    def setUp(self):
+        self.gunicorn_master_proc = mock.Mock(pid=2137)
+        self.children = mock.MagicMock()
+        self.child = mock.MagicMock()
+        self.process = mock.MagicMock()
+        self.monitor = cli.GunicornMonitor(
+            gunicorn_master_proc=self.gunicorn_master_proc,
+            num_workers_expected=4,
+            master_timeout=60,
+            worker_refresh_interval=60,
+            worker_refresh_batch_size=2,
+            reload_on_plugin_change=True,
+        )
+
+    def test_ready_prefix_on_cmdline(self):
+        self.child.cmdline.return_value = [settings.GUNICORN_WORKER_READY_PREFIX]
+        self.process.children.return_value = [self.child]
+
+        with mock.patch('psutil.Process', return_value=self.process):
+            self.assertEqual(self.monitor._get_num_ready_workers_running(), 1)
+
+    def test_ready_prefix_on_cmdline_no_children(self):
+        self.process.children.return_value = []
+
+        with mock.patch('psutil.Process', return_value=self.process):
+            self.assertEqual(self.monitor._get_num_ready_workers_running(), 0)
+
+    def test_ready_prefix_on_cmdline_zombie(self):
+        self.child.cmdline.return_value = []
+        self.process.children.return_value = [self.child]
+
+        with mock.patch('psutil.Process', return_value=self.process):
+            self.assertEqual(self.monitor._get_num_ready_workers_running(), 0)
+
+    def test_ready_prefix_on_cmdline_dead_process(self):
+        self.child.cmdline.side_effect = psutil.NoSuchProcess(11347)
+        self.process.children.return_value = [self.child]
+
+        with mock.patch('psutil.Process', return_value=self.process):
+            self.assertEqual(self.monitor._get_num_ready_workers_running(), 0)