You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/06/29 21:18:07 UTC

[airflow] branch v1-10-test updated (1606f56 -> c0e5044)

This is an automated email from the ASF dual-hosted git repository.

kamilbregula pushed a change to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    from 1606f56  fixup! fixup! Avoid color info in response of /dag_stats & /task_stats (#8742)
     new 152ee6e  Reload gunicorn when plugins has beeen changed (#8997)
     new c0e5044  Fix failing tests from #8997 (#9576)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 airflow/bin/cli.py                           | 352 +++++++++++++++++++--------
 airflow/config_templates/config.yml          |   8 +
 airflow/config_templates/default_airflow.cfg |   4 +
 tests/cli/test_cli.py                        | 233 +++++++++++++++---
 4 files changed, 454 insertions(+), 143 deletions(-)


[airflow] 02/02: Fix failing tests from #8997 (#9576)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kamilbregula pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit c0e504415daad0e39ed59f0d61f0660e99a50c50
Author: Kamil Breguła <mi...@users.noreply.github.com>
AuthorDate: Mon Jun 29 21:02:44 2020 +0200

    Fix failing tests from #8997 (#9576)
    
    (cherry picked from commit 9858294f7252e626bec7b701280e81d4f27df452)
---
 airflow/bin/cli.py                           | 4 +++-
 airflow/config_templates/config.yml          | 4 ++--
 airflow/config_templates/default_airflow.cfg | 4 ++--
 3 files changed, 7 insertions(+), 5 deletions(-)

diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 3c1ee39..305310c 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -1135,7 +1135,9 @@ def webserver(args):
                 master_timeout=conf.getint('webserver', 'web_server_master_timeout'),
                 worker_refresh_interval=conf.getint('webserver', 'worker_refresh_interval', fallback=10),
                 worker_refresh_batch_size=conf.getint('webserver', 'worker_refresh_batch_size', fallback=1),
-                reload_on_plugin_change=conf.getint('webserver', 'reload_on_plugin_change', fallback=1),
+                reload_on_plugin_change=conf.getboolean(
+                    'webserver', 'reload_on_plugin_change', fallback=False
+                ),
             ).start()
 
         if args.daemon:
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 1f697b1..f632cd5 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -701,12 +701,12 @@
       default: "30"
     - name: reload_on_plugin_change
       description: |
-        If set to True, Airflow will track files in plugins_follder directory. When it detects changes,
+        If set to True, Airflow will track files in plugins_folder directory. When it detects changes,
         then reload the gunicorn.
       version_added: ~
       type: boolean
       example: ~
-      default: False
+      default: "False"
     - name: secret_key
       description: |
         Secret key used to run your flask app
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index cfc92ad..a061d46 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -341,9 +341,9 @@ worker_refresh_batch_size = 1
 # Number of seconds to wait before refreshing a batch of workers.
 worker_refresh_interval = 30
 
-# If set to True, Airflow will track files in plugins_follder directory. When it detects changes,
+# If set to True, Airflow will track files in plugins_folder directory. When it detects changes,
 # then reload the gunicorn.
-reload_on_plugin_change =
+reload_on_plugin_change = False
 
 # Secret key used to run your flask app
 # It should be as random as possible


[airflow] 01/02: Reload gunicorn when plugins has beeen changed (#8997)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kamilbregula pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 152ee6ee9f16aa5b3c7a6c3535d452a09c7cf381
Author: Kamil Breguła <ka...@polidea.com>
AuthorDate: Mon Jun 29 20:38:26 2020 +0200

    Reload gunicorn when plugins has beeen changed (#8997)
    
    (cherry picked from commit 1c48ffbe25c3e304660b7e75a49e88bd114dde46)
---
 airflow/bin/cli.py                           | 350 +++++++++++++++++++--------
 airflow/config_templates/config.yml          |   8 +
 airflow/config_templates/default_airflow.cfg |   4 +
 tests/cli/test_cli.py                        | 233 +++++++++++++++---
 4 files changed, 452 insertions(+), 143 deletions(-)

diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index b3ffea7..3c1ee39 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -20,6 +20,7 @@
 
 from __future__ import print_function
 import errno
+import hashlib
 import importlib
 import locale
 import logging
@@ -777,31 +778,11 @@ def clear(args):
     )
 
 
-def get_num_ready_workers_running(gunicorn_master_proc):
-    workers = psutil.Process(gunicorn_master_proc.pid).children()
-
-    def ready_prefix_on_cmdline(proc):
-        try:
-            cmdline = proc.cmdline()
-            if len(cmdline) > 0:
-                return settings.GUNICORN_WORKER_READY_PREFIX in cmdline[0]
-        except psutil.NoSuchProcess:
-            pass
-        return False
-
-    ready_workers = [proc for proc in workers if ready_prefix_on_cmdline(proc)]
-    return len(ready_workers)
-
-
-def get_num_workers_running(gunicorn_master_proc):
-    workers = psutil.Process(gunicorn_master_proc.pid).children()
-    return len(workers)
-
-
-def restart_workers(gunicorn_master_proc, num_workers_expected, master_timeout):
+class GunicornMonitor(LoggingMixin):
     """
     Runs forever, monitoring the child processes of @gunicorn_master_proc and
-    restarting workers occasionally.
+    restarting workers occasionally or when files in the plug-in directory
+    has been modified.
     Each iteration of the loop traverses one edge of this state transition
     diagram, where each state (node) represents
     [ num_ready_workers_running / num_workers_running ]. We expect most time to
@@ -818,92 +799,245 @@ def restart_workers(gunicorn_master_proc, num_workers_expected, master_timeout):
     master process, which increases and decreases the number of child workers
     respectively. Gunicorn guarantees that on TTOU workers are terminated
     gracefully and that the oldest worker is terminated.
+
+    :param gunicorn_master_proc:  handle for the main Gunicorn process
+    :param num_workers_expected:  Number of workers to run the Gunicorn web server
+    :param master_timeout: Number of seconds the webserver waits before killing gunicorn master that
+        doesn't respond
+    :param worker_refresh_interval: Number of seconds to wait before refreshing a batch of workers.
+    :param worker_refresh_batch_size: Number of workers to refresh at a time. When set to 0, worker
+        refresh is disabled. When nonzero, airflow periodically refreshes webserver workers by
+        bringing up new ones and killing old ones.
+    :param reload_on_plugin_change: If set to True, Airflow will track files in plugins_follder directory.
+        When it detects changes, then reload the gunicorn.
     """
+    def __init__(
+        self,
+        gunicorn_master_proc,
+        num_workers_expected,
+        master_timeout,
+        worker_refresh_interval,
+        worker_refresh_batch_size,
+        reload_on_plugin_change
+    ):
+        super(GunicornMonitor, self).__init__()
+        self.gunicorn_master_proc = gunicorn_master_proc
+        self.num_workers_expected = num_workers_expected
+        self.master_timeout = master_timeout
+        self.worker_refresh_interval = worker_refresh_interval
+        self.worker_refresh_batch_size = worker_refresh_batch_size
+        self.reload_on_plugin_change = reload_on_plugin_change
+
+        self._num_workers_running = 0
+        self._num_ready_workers_running = 0
+        self._last_refresh_time = time.time() if worker_refresh_interval > 0 else None
+        self._last_plugin_state = self._generate_plugin_state() if reload_on_plugin_change else None
+        self._restart_on_next_plugin_check = False
+
+    def _generate_plugin_state(self):
+        """
+        Generate dict of filenames and last modification time of all files in settings.PLUGINS_FOLDER
+        directory.
+        """
+        if not settings.PLUGINS_FOLDER:
+            return {}
+
+        all_filenames = []
+        for (root, _, filenames) in os.walk(settings.PLUGINS_FOLDER):
+            all_filenames.extend(os.path.join(root, f) for f in filenames)
+        plugin_state = {f: self._get_file_hash(f) for f in sorted(all_filenames)}
+        return plugin_state
+
+    @staticmethod
+    def _get_file_hash(fname):
+        """Calculate MD5 hash for file"""
+        hash_md5 = hashlib.md5()
+        with open(fname, "rb") as f:
+            for chunk in iter(lambda: f.read(4096), b""):
+                hash_md5.update(chunk)
+        return hash_md5.hexdigest()
+
+    def _get_num_ready_workers_running(self):
+        """Returns number of ready Gunicorn workers by looking for READY_PREFIX in process name"""
+        workers = psutil.Process(self.gunicorn_master_proc.pid).children()
+
+        def ready_prefix_on_cmdline(proc):
+            try:
+                cmdline = proc.cmdline()
+                if len(cmdline) > 0:  # pylint: disable=len-as-condition
+                    return settings.GUNICORN_WORKER_READY_PREFIX in cmdline[0]
+            except psutil.NoSuchProcess:
+                pass
+            return False
+
+        ready_workers = [proc for proc in workers if ready_prefix_on_cmdline(proc)]
+        return len(ready_workers)
 
-    def wait_until_true(fn, timeout=0):
+    def _get_num_workers_running(self):
+        """Returns number of running Gunicorn workers processes"""
+        workers = psutil.Process(self.gunicorn_master_proc.pid).children()
+        return len(workers)
+
+    def _wait_until_true(self, fn, timeout=0):
         """
         Sleeps until fn is true
         """
-        t = time.time()
+        start_time = time.time()
         while not fn():
-            if 0 < timeout and timeout <= time.time() - t:
+            if 0 < timeout <= time.time() - start_time:
                 raise AirflowWebServerTimeout(
-                    "No response from gunicorn master within {0} seconds"
-                    .format(timeout))
+                    "No response from gunicorn master within {0} seconds".format(timeout)
+                )
             time.sleep(0.1)
 
-    def start_refresh(gunicorn_master_proc):
-        batch_size = conf.getint('webserver', 'worker_refresh_batch_size')
-        log.debug('%s doing a refresh of %s workers', state, batch_size)
-        sys.stdout.flush()
-        sys.stderr.flush()
-
+    def _spawn_new_workers(self, count):
+        """
+        Send signal to kill the worker.
+        :param count: The number of workers to spawn
+        """
         excess = 0
-        for _ in range(batch_size):
-            gunicorn_master_proc.send_signal(signal.SIGTTIN)
+        for _ in range(count):
+            # TTIN: Increment the number of processes by one
+            self.gunicorn_master_proc.send_signal(signal.SIGTTIN)
             excess += 1
-            wait_until_true(lambda: num_workers_expected + excess ==
-                            get_num_workers_running(gunicorn_master_proc),
-                            master_timeout)
+            self._wait_until_true(
+                lambda: self.num_workers_expected + excess == self._get_num_workers_running(),
+                timeout=self.master_timeout
+            )
 
-    try:
-        wait_until_true(lambda: num_workers_expected ==
-                        get_num_workers_running(gunicorn_master_proc),
-                        master_timeout)
-        while True:
-            num_workers_running = get_num_workers_running(gunicorn_master_proc)
-            num_ready_workers_running = \
-                get_num_ready_workers_running(gunicorn_master_proc)
-
-            state = '[{0} / {1}]'.format(num_ready_workers_running, num_workers_running)
-
-            # Whenever some workers are not ready, wait until all workers are ready
-            if num_ready_workers_running < num_workers_running:
-                log.debug('%s some workers are starting up, waiting...', state)
-                sys.stdout.flush()
+    def _kill_old_workers(self, count):
+        """
+        Send signal to kill the worker.
+        :param count: The number of workers to kill
+        """
+        for _ in range(count):
+            count -= 1
+            # TTOU: Decrement the number of processes by one
+            self.gunicorn_master_proc.send_signal(signal.SIGTTOU)
+            self._wait_until_true(
+                lambda: self.num_workers_expected + count == self._get_num_workers_running(),
+                timeout=self.master_timeout)
+
+    def _reload_gunicorn(self):
+        """
+        Send signal to reload the gunciron configuration. When gunciorn receive signals, it reload the
+        configuration, start the new worker processes with a new configuration and gracefully
+        shutdown older workers.
+        """
+        # HUP: Reload the configuration.
+        self.gunicorn_master_proc.send_signal(signal.SIGHUP)
+        time.sleep(1)
+        self._wait_until_true(
+            lambda: self.num_workers_expected == self._get_num_workers_running(),
+            timeout=self.master_timeout
+        )
+
+    def start(self):
+        """
+        Starts monitoring the webserver.
+        """
+        try:  # pylint: disable=too-many-nested-blocks
+            self._wait_until_true(
+                lambda: self.num_workers_expected == self._get_num_workers_running(),
+                timeout=self.master_timeout
+            )
+            while True:
+                if self.gunicorn_master_proc.poll() is not None:
+                    sys.exit(self.gunicorn_master_proc.returncode)
+                self._check_workers()
+                # Throttle loop
                 time.sleep(1)
 
-            # Kill a worker gracefully by asking gunicorn to reduce number of workers
-            elif num_workers_running > num_workers_expected:
-                excess = num_workers_running - num_workers_expected
-                log.debug('%s killing %s workers', state, excess)
-
-                for _ in range(excess):
-                    gunicorn_master_proc.send_signal(signal.SIGTTOU)
-                    excess -= 1
-                    wait_until_true(lambda: num_workers_expected + excess ==
-                                    get_num_workers_running(gunicorn_master_proc),
-                                    master_timeout)
-
-            # Start a new worker by asking gunicorn to increase number of workers
-            elif num_workers_running == num_workers_expected:
-                refresh_interval = conf.getint('webserver', 'worker_refresh_interval')
-                log.debug(
-                    '%s sleeping for %ss starting doing a refresh...',
-                    state, refresh_interval
+        except (AirflowWebServerTimeout, OSError) as err:
+            self.log.error(err)
+            self.log.error("Shutting down webserver")
+            try:
+                self.gunicorn_master_proc.terminate()
+                self.gunicorn_master_proc.wait()
+            finally:
+                sys.exit(1)
+
+    def _check_workers(self):
+        num_workers_running = self._get_num_workers_running()
+        num_ready_workers_running = self._get_num_ready_workers_running()
+
+        # Whenever some workers are not ready, wait until all workers are ready
+        if num_ready_workers_running < num_workers_running:
+            self.log.debug(
+                '[%d / %d] Some workers are starting up, waiting...',
+                num_ready_workers_running, num_workers_running
+            )
+            time.sleep(1)
+            return
+
+        # If there are too many workers, then kill a worker gracefully by asking gunicorn to reduce
+        # number of workers
+        if num_workers_running > self.num_workers_expected:
+            excess = min(num_workers_running - self.num_workers_expected, self.worker_refresh_batch_size)
+            self.log.debug(
+                '[%d / %d] Killing %s workers', num_ready_workers_running, num_workers_running, excess
+            )
+            self._kill_old_workers(excess)
+            return
+
+        # If there are too few workers, start a new worker by asking gunicorn
+        # to increase number of workers
+        if num_workers_running < self.num_workers_expected:
+            self.log.error(
+                "[%d / %d] Some workers seem to have died and gunicorn did not restart "
+                "them as expected",
+                num_ready_workers_running, num_workers_running
+            )
+            time.sleep(10)
+            num_workers_running = self._get_num_workers_running()
+            if num_workers_running < self.num_workers_expected:
+                new_worker_count = min(
+                    num_workers_running - self.worker_refresh_batch_size, self.worker_refresh_batch_size
+                )
+                self.log.debug(
+                    '[%d / %d] Spawning %d workers',
+                    num_ready_workers_running, num_workers_running, new_worker_count
+                )
+                self._spawn_new_workers(num_workers_running)
+            return
+
+        # Now the number of running and expected worker should be equal
+
+        # If workers should be restarted periodically.
+        if self.worker_refresh_interval > 0 and self._last_refresh_time:
+            # and we refreshed the workers a long time ago, refresh the workers
+            last_refresh_diff = (time.time() - self._last_refresh_time)
+            if self.worker_refresh_interval < last_refresh_diff:
+                num_new_workers = self.worker_refresh_batch_size
+                self.log.debug(
+                    '[%d / %d] Starting doing a refresh. Starting %d workers.',
+                    num_ready_workers_running, num_workers_running, num_new_workers
                 )
-                time.sleep(refresh_interval)
-                start_refresh(gunicorn_master_proc)
+                self._spawn_new_workers(num_new_workers)
+                self._last_refresh_time = time.time()
+                return
 
-            else:
-                # num_ready_workers_running == num_workers_running < num_workers_expected
-                log.error((
-                    "%s some workers seem to have died and gunicorn"
-                    "did not restart them as expected"
-                ), state)
-                time.sleep(10)
-                if len(
-                    psutil.Process(gunicorn_master_proc.pid).children()
-                ) < num_workers_expected:
-                    start_refresh(gunicorn_master_proc)
-    except (AirflowWebServerTimeout, OSError) as err:
-        log.error(err)
-        log.error("Shutting down webserver")
-        try:
-            gunicorn_master_proc.terminate()
-            gunicorn_master_proc.wait()
-        finally:
-            sys.exit(1)
+        # if we should check the directory with the plugin,
+        if self.reload_on_plugin_change:
+            # compare the previous and current contents of the directory
+            new_state = self._generate_plugin_state()
+            # If changed, wait until its content is fully saved.
+            if new_state != self._last_plugin_state:
+                self.log.debug(
+                    '[%d / %d] Plugins folder changed. The gunicorn will be restarted the next time the '
+                    'plugin directory is checked, if there is no change in it.',
+                    num_ready_workers_running, num_workers_running
+                )
+                self._restart_on_next_plugin_check = True
+                self._last_plugin_state = new_state
+            elif self._restart_on_next_plugin_check:
+                self.log.debug(
+                    '[%d / %d] Starts reloading the gunicorn configuration.',
+                    num_ready_workers_running, num_workers_running
+                )
+                self._restart_on_next_plugin_check = False
+                self._last_refresh_time = time.time()
+                self._reload_gunicorn()
 
 
 @cli_utils.action_logging
@@ -962,13 +1096,13 @@ def webserver(args):
 
         run_args = [
             'gunicorn',
-            '-w', str(num_workers),
-            '-k', str(args.workerclass),
-            '-t', str(worker_timeout),
-            '-b', args.hostname + ':' + str(args.port),
-            '-n', 'airflow-webserver',
-            '-p', str(pid),
-            '-c', 'python:airflow.www.gunicorn_config',
+            '--workers', str(num_workers),
+            '--worker-class', str(args.workerclass),
+            '--timeout', str(worker_timeout),
+            '--bind', args.hostname + ':' + str(args.port),
+            '--name', 'airflow-webserver',
+            '--pid', str(pid),
+            '--config', 'python:airflow.www.gunicorn_config',
         ]
 
         if args.access_logfile:
@@ -978,7 +1112,7 @@ def webserver(args):
             run_args += ['--error-logfile', str(args.error_logfile)]
 
         if args.daemon:
-            run_args += ['-D']
+            run_args += ['--deamon']
 
         if ssl_cert:
             run_args += ['--certfile', ssl_cert, '--keyfile', ssl_key]
@@ -995,12 +1129,14 @@ def webserver(args):
 
         def monitor_gunicorn(gunicorn_master_proc):
             # These run forever until SIG{INT, TERM, KILL, ...} signal is sent
-            if conf.getint('webserver', 'worker_refresh_interval') > 0:
-                master_timeout = conf.getint('webserver', 'web_server_master_timeout')
-                restart_workers(gunicorn_master_proc, num_workers, master_timeout)
-            else:
-                while True:
-                    time.sleep(1)
+            GunicornMonitor(
+                gunicorn_master_proc=gunicorn_master_proc,
+                num_workers_expected=num_workers,
+                master_timeout=conf.getint('webserver', 'web_server_master_timeout'),
+                worker_refresh_interval=conf.getint('webserver', 'worker_refresh_interval', fallback=10),
+                worker_refresh_batch_size=conf.getint('webserver', 'worker_refresh_batch_size', fallback=1),
+                reload_on_plugin_change=conf.getint('webserver', 'reload_on_plugin_change', fallback=1),
+            ).start()
 
         if args.daemon:
             base, ext = os.path.splitext(pid)
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index e32b8cc..1f697b1 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -699,6 +699,14 @@
       type: string
       example: ~
       default: "30"
+    - name: reload_on_plugin_change
+      description: |
+        If set to True, Airflow will track files in plugins_follder directory. When it detects changes,
+        then reload the gunicorn.
+      version_added: ~
+      type: boolean
+      example: ~
+      default: False
     - name: secret_key
       description: |
         Secret key used to run your flask app
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index c75d3ae..cfc92ad 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -341,6 +341,10 @@ worker_refresh_batch_size = 1
 # Number of seconds to wait before refreshing a batch of workers.
 worker_refresh_interval = 30
 
+# If set to True, Airflow will track files in plugins_follder directory. When it detects changes,
+# then reload the gunicorn.
+reload_on_plugin_change =
+
 # Secret key used to run your flask app
 # It should be as random as possible
 secret_key = temporary_key
diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py
index a2a81ac..5748057 100644
--- a/tests/cli/test_cli.py
+++ b/tests/cli/test_cli.py
@@ -17,6 +17,7 @@
 # specific language governing permissions and limitations
 # under the License.
 import contextlib
+import errno
 import io
 
 import logging
@@ -28,8 +29,8 @@ from six import StringIO, PY2
 import sys
 
 from datetime import datetime, timedelta, time
-from mock import patch, Mock, MagicMock
-from time import sleep
+from mock import patch, MagicMock
+from time import sleep, time as timetime
 import psutil
 import pytz
 import subprocess
@@ -37,9 +38,10 @@ import pytest
 from argparse import Namespace
 from airflow import settings
 import airflow.bin.cli as cli
-from airflow.bin.cli import get_num_ready_workers_running, run, get_dag
+from airflow.bin.cli import run, get_dag
 from airflow.models import TaskInstance
 from airflow.utils import timezone
+from airflow.utils.file import TemporaryDirectory
 from airflow.utils.state import State
 from airflow.settings import Session
 from airflow import models
@@ -154,39 +156,6 @@ class TestCLI(unittest.TestCase):
         cls.dagbag = models.DagBag(include_examples=True)
         cls.parser = cli.CLIFactory.get_parser()
 
-    def setUp(self):
-        self.gunicorn_master_proc = Mock(pid=None)
-        self.children = MagicMock()
-        self.child = MagicMock()
-        self.process = MagicMock()
-
-    def test_ready_prefix_on_cmdline(self):
-        self.child.cmdline.return_value = [settings.GUNICORN_WORKER_READY_PREFIX]
-        self.process.children.return_value = [self.child]
-
-        with patch('psutil.Process', return_value=self.process):
-            self.assertEqual(get_num_ready_workers_running(self.gunicorn_master_proc), 1)
-
-    def test_ready_prefix_on_cmdline_no_children(self):
-        self.process.children.return_value = []
-
-        with patch('psutil.Process', return_value=self.process):
-            self.assertEqual(get_num_ready_workers_running(self.gunicorn_master_proc), 0)
-
-    def test_ready_prefix_on_cmdline_zombie(self):
-        self.child.cmdline.return_value = []
-        self.process.children.return_value = [self.child]
-
-        with patch('psutil.Process', return_value=self.process):
-            self.assertEqual(get_num_ready_workers_running(self.gunicorn_master_proc), 0)
-
-    def test_ready_prefix_on_cmdline_dead_process(self):
-        self.child.cmdline.side_effect = psutil.NoSuchProcess(11347)
-        self.process.children.return_value = [self.child]
-
-        with patch('psutil.Process', return_value=self.process):
-            self.assertEqual(get_num_ready_workers_running(self.gunicorn_master_proc), 0)
-
     def test_cli_webserver_debug(self):
         env = os.environ.copy()
         p = psutil.Popen(["airflow", "webserver", "-d"], env=env)
@@ -847,3 +816,195 @@ class TestShowInfo(unittest.TestCase):
         self.assertIn("https://file.io/TEST", temp_stdout.getvalue())
         content = mock_requests.post.call_args[1]["files"]["file"][1]
         self.assertIn("postgresql+psycopg2://p...s:PASSWORD@postgres/airflow", content)
+
+
+class TestGunicornMonitor(unittest.TestCase):
+
+    def setUp(self):
+        self.gunicorn_master_proc = mock.Mock(pid=2137)
+        self.monitor = cli.GunicornMonitor(
+            gunicorn_master_proc=self.gunicorn_master_proc,
+            num_workers_expected=4,
+            master_timeout=60,
+            worker_refresh_interval=60,
+            worker_refresh_batch_size=2,
+            reload_on_plugin_change=True,
+        )
+        mock.patch.object(self.monitor, '_generate_plugin_state', return_value={}).start()
+        mock.patch.object(self.monitor, '_get_num_ready_workers_running', return_value=4).start()
+        mock.patch.object(self.monitor, '_get_num_workers_running', return_value=4).start()
+        mock.patch.object(self.monitor, '_spawn_new_workers', return_value=None).start()
+        mock.patch.object(self.monitor, '_kill_old_workers', return_value=None).start()
+        mock.patch.object(self.monitor, '_reload_gunicorn', return_value=None).start()
+
+    @mock.patch('airflow.bin.cli.time.sleep')
+    def test_should_wait_for_workers_to_start(self, mock_sleep):
+        self.monitor._get_num_ready_workers_running.return_value = 0
+        self.monitor._get_num_workers_running.return_value = 4
+        self.monitor._check_workers()
+        self.monitor._spawn_new_workers.assert_not_called()  # pylint: disable=no-member
+        self.monitor._kill_old_workers.assert_not_called()  # pylint: disable=no-member
+        self.monitor._reload_gunicorn.assert_not_called()  # pylint: disable=no-member
+
+    @mock.patch('airflow.bin.cli.time.sleep')
+    def test_should_kill_excess_workers(self, mock_sleep):
+        self.monitor._get_num_ready_workers_running.return_value = 10
+        self.monitor._get_num_workers_running.return_value = 10
+        self.monitor._check_workers()
+        self.monitor._spawn_new_workers.assert_not_called()  # pylint: disable=no-member
+        self.monitor._kill_old_workers.assert_called_once_with(2)  # pylint: disable=no-member
+        self.monitor._reload_gunicorn.assert_not_called()  # pylint: disable=no-member
+
+    @mock.patch('airflow.bin.cli.time.sleep')
+    def test_should_start_new_workers_when_missing(self, mock_sleep):
+        self.monitor._get_num_ready_workers_running.return_value = 2
+        self.monitor._get_num_workers_running.return_value = 2
+        self.monitor._check_workers()
+        self.monitor._spawn_new_workers.assert_called_once_with(2)  # pylint: disable=no-member
+        self.monitor._kill_old_workers.assert_not_called()  # pylint: disable=no-member
+        self.monitor._reload_gunicorn.assert_not_called()  # pylint: disable=no-member
+
+    @mock.patch('airflow.bin.cli.time.sleep')
+    def test_should_start_new_workers_when_refresh_interval_has_passed(self, mock_sleep):
+        self.monitor._last_refresh_time -= 200
+        self.monitor._check_workers()
+        self.monitor._spawn_new_workers.assert_called_once_with(2)  # pylint: disable=no-member
+        self.monitor._kill_old_workers.assert_not_called()  # pylint: disable=no-member
+        self.monitor._reload_gunicorn.assert_not_called()  # pylint: disable=no-member
+        self.assertAlmostEqual(self.monitor._last_refresh_time, timetime(), delta=5)
+
+    @mock.patch('airflow.bin.cli.time.sleep')
+    def test_should_reload_when_plugin_has_been_changed(self, mock_sleep):
+        self.monitor._generate_plugin_state.return_value = {'AA': 12}
+
+        self.monitor._check_workers()
+
+        self.monitor._spawn_new_workers.assert_not_called()  # pylint: disable=no-member
+        self.monitor._kill_old_workers.assert_not_called()  # pylint: disable=no-member
+        self.monitor._reload_gunicorn.assert_not_called()  # pylint: disable=no-member
+
+        self.monitor._generate_plugin_state.return_value = {'AA': 32}
+
+        self.monitor._check_workers()
+
+        self.monitor._spawn_new_workers.assert_not_called()  # pylint: disable=no-member
+        self.monitor._kill_old_workers.assert_not_called()  # pylint: disable=no-member
+        self.monitor._reload_gunicorn.assert_not_called()  # pylint: disable=no-member
+
+        self.monitor._generate_plugin_state.return_value = {'AA': 32}
+
+        self.monitor._check_workers()
+
+        self.monitor._spawn_new_workers.assert_not_called()  # pylint: disable=no-member
+        self.monitor._kill_old_workers.assert_not_called()  # pylint: disable=no-member
+        self.monitor._reload_gunicorn.assert_called_once_with()  # pylint: disable=no-member
+        self.assertAlmostEqual(self.monitor._last_refresh_time, timetime(), delta=5)
+
+
+class TestGunicornMonitorGeneratePluginState(unittest.TestCase):
+    @staticmethod
+    def _prepare_test_file(filepath, size):
+        try:
+            os.makedirs(os.path.dirname(filepath))
+        except OSError as e:
+            # be happy if someone already created the path
+            if e.errno != errno.EEXIST:
+                raise
+        with open(filepath, "w") as file:
+            file.write("A" * size)
+            file.flush()
+
+    def test_should_detect_changes_in_directory(self):
+        with TemporaryDirectory(prefix="tmp") as tempdir, \
+                mock.patch("airflow.bin.cli.settings.PLUGINS_FOLDER", tempdir):
+            self._prepare_test_file("{}/file1.txt".format(tempdir), 100)
+            self._prepare_test_file("{}/nested/nested/nested/nested/file2.txt".format(tempdir), 200)
+            self._prepare_test_file("{}/file3.txt".format(tempdir), 300)
+
+            monitor = cli.GunicornMonitor(
+                gunicorn_master_proc=mock.MagicMock(),
+                num_workers_expected=4,
+                master_timeout=60,
+                worker_refresh_interval=60,
+                worker_refresh_batch_size=2,
+                reload_on_plugin_change=True,
+            )
+
+            # When the files have not changed, the result should be constant
+            state_a = monitor._generate_plugin_state()
+            state_b = monitor._generate_plugin_state()
+
+            self.assertEqual(state_a, state_b)
+            self.assertEqual(3, len(state_a))
+
+            # Should detect new file
+            self._prepare_test_file("{}/file4.txt".format(tempdir), 400)
+
+            state_c = monitor._generate_plugin_state()
+
+            self.assertNotEqual(state_b, state_c)
+            self.assertEqual(4, len(state_c))
+
+            # Should detect changes in files
+            self._prepare_test_file("{}/file4.txt".format(tempdir), 450)
+
+            state_d = monitor._generate_plugin_state()
+
+            self.assertNotEqual(state_c, state_d)
+            self.assertEqual(4, len(state_d))
+
+            # Should support large files
+            self._prepare_test_file("{}/file4.txt".format(tempdir), 4000000)
+
+            state_d = monitor._generate_plugin_state()
+
+            self.assertNotEqual(state_c, state_d)
+            self.assertEqual(4, len(state_d))
+
+
+class TestCLIGetNumReadyWorkersRunning(unittest.TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        cls.parser = cli.get_parser()
+
+    def setUp(self):
+        self.gunicorn_master_proc = mock.Mock(pid=2137)
+        self.children = mock.MagicMock()
+        self.child = mock.MagicMock()
+        self.process = mock.MagicMock()
+        self.monitor = cli.GunicornMonitor(
+            gunicorn_master_proc=self.gunicorn_master_proc,
+            num_workers_expected=4,
+            master_timeout=60,
+            worker_refresh_interval=60,
+            worker_refresh_batch_size=2,
+            reload_on_plugin_change=True,
+        )
+
+    def test_ready_prefix_on_cmdline(self):
+        self.child.cmdline.return_value = [settings.GUNICORN_WORKER_READY_PREFIX]
+        self.process.children.return_value = [self.child]
+
+        with mock.patch('psutil.Process', return_value=self.process):
+            self.assertEqual(self.monitor._get_num_ready_workers_running(), 1)
+
+    def test_ready_prefix_on_cmdline_no_children(self):
+        self.process.children.return_value = []
+
+        with mock.patch('psutil.Process', return_value=self.process):
+            self.assertEqual(self.monitor._get_num_ready_workers_running(), 0)
+
+    def test_ready_prefix_on_cmdline_zombie(self):
+        self.child.cmdline.return_value = []
+        self.process.children.return_value = [self.child]
+
+        with mock.patch('psutil.Process', return_value=self.process):
+            self.assertEqual(self.monitor._get_num_ready_workers_running(), 0)
+
+    def test_ready_prefix_on_cmdline_dead_process(self):
+        self.child.cmdline.side_effect = psutil.NoSuchProcess(11347)
+        self.process.children.return_value = [self.child]
+
+        with mock.patch('psutil.Process', return_value=self.process):
+            self.assertEqual(self.monitor._get_num_ready_workers_running(), 0)