You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/06/03 15:12:04 UTC

[GitHub] [airflow] potiuk commented on a change in pull request #8997: Reload gunicorn when plugins has beeen changed

potiuk commented on a change in pull request #8997:
URL: https://github.com/apache/airflow/pull/8997#discussion_r434638254



##########
File path: airflow/cli/commands/webserver_command.py
##########
@@ -83,92 +62,238 @@ def restart_workers(gunicorn_master_proc, num_workers_expected, master_timeout):
     master process, which increases and decreases the number of child workers
     respectively. Gunicorn guarantees that on TTOU workers are terminated
     gracefully and that the oldest worker is terminated.
-    """
 
-    def wait_until_true(fn, timeout=0):
+    :param gunicorn_master_proc:  handle for the main Gunicorn process
+    :param num_workers_expected:  Number of workers to run the Gunicorn web server
+    :param master_timeout: Number of seconds the webserver waits before killing gunicorn master that
+        doesn't respond
+    :param worker_refresh_interval: Number of seconds to wait before refreshing a batch of workers.
+    :param worker_refresh_batch_size: Number of workers to refresh at a time. When set to 0, worker
+        refresh is disabled. When nonzero, airflow periodically refreshes webserver workers by
+        bringing up new ones and killing old ones.
+    :param reload_on_plugin_change: If set to True, Airflow will track files in plugins_follder directory.
+        When it detects changes, then reload the gunicorn.
+    """
+    def __init__(
+        self,
+        gunicorn_master_proc: psutil.Process,
+        num_workers_expected: int,
+        master_timeout: int,
+        worker_refresh_interval: int,
+        worker_refresh_batch_size: int,
+        reload_on_plugin_change: bool
+    ):
+        super(GunicornMonitor, self).__init__()
+        self.gunicorn_master_proc = gunicorn_master_proc
+        self.num_workers_expected = num_workers_expected
+        self.master_timeout = master_timeout
+        self.worker_refresh_interval = worker_refresh_interval
+        self.worker_refresh_batch_size = worker_refresh_batch_size
+        self.reload_on_plugin_change = reload_on_plugin_change
+
+        self._num_workers_running = 0
+        self._num_ready_workers_running = 0
+        self._last_refresh_time = time.time() if worker_refresh_interval > 0 else None
+        self._last_plugin_state = self._generate_plugin_state() if reload_on_plugin_change else None
+        self._restart_on_next_plugin_check = False
+
+    def _generate_plugin_state(self) -> Dict[str, float]:
+        """
+        Generate dict of filenames and last modification time of all files in settings.PLUGINS_FOLDER
+        directory.
+        """
+        if not settings.PLUGINS_FOLDER:
+            return {}
+        all_filenames: List[str] = []
+        for (root, _, filenames) in os.walk(settings.PLUGINS_FOLDER):
+            # filenames: List[str] = cast(List[str], filenames)
+            all_filenames.extend(os.path.join(root, f) for f in filenames)

Review comment:
       I think it will be much better to calculate hash of the files ? I  think mtime is a bad thing to use for multiple reasons - one of them is that in a number of situations you can get mtime modified when the file is not updated various sync solutions might do it). I always prefer to calculate hashes in those kind of checks.

##########
File path: airflow/cli/commands/webserver_command.py
##########
@@ -226,13 +351,13 @@ def webserver(args):
 
         run_args = [
             'gunicorn',
-            '-w', str(num_workers),
-            '-k', str(args.workerclass),
-            '-t', str(worker_timeout),
-            '-b', args.hostname + ':' + str(args.port),
-            '-n', 'airflow-webserver',
-            '-p', pid_file,
-            '-c', 'python:airflow.www.gunicorn_config',
+            '--workers', str(num_workers),

Review comment:
       :heart: 

##########
File path: airflow/cli/commands/webserver_command.py
##########
@@ -83,92 +62,238 @@ def restart_workers(gunicorn_master_proc, num_workers_expected, master_timeout):
     master process, which increases and decreases the number of child workers
     respectively. Gunicorn guarantees that on TTOU workers are terminated
     gracefully and that the oldest worker is terminated.
-    """
 
-    def wait_until_true(fn, timeout=0):
+    :param gunicorn_master_proc:  handle for the main Gunicorn process
+    :param num_workers_expected:  Number of workers to run the Gunicorn web server
+    :param master_timeout: Number of seconds the webserver waits before killing gunicorn master that
+        doesn't respond
+    :param worker_refresh_interval: Number of seconds to wait before refreshing a batch of workers.
+    :param worker_refresh_batch_size: Number of workers to refresh at a time. When set to 0, worker
+        refresh is disabled. When nonzero, airflow periodically refreshes webserver workers by
+        bringing up new ones and killing old ones.
+    :param reload_on_plugin_change: If set to True, Airflow will track files in plugins_follder directory.
+        When it detects changes, then reload the gunicorn.
+    """
+    def __init__(
+        self,
+        gunicorn_master_proc: psutil.Process,
+        num_workers_expected: int,
+        master_timeout: int,
+        worker_refresh_interval: int,
+        worker_refresh_batch_size: int,
+        reload_on_plugin_change: bool
+    ):
+        super(GunicornMonitor, self).__init__()
+        self.gunicorn_master_proc = gunicorn_master_proc
+        self.num_workers_expected = num_workers_expected
+        self.master_timeout = master_timeout
+        self.worker_refresh_interval = worker_refresh_interval
+        self.worker_refresh_batch_size = worker_refresh_batch_size
+        self.reload_on_plugin_change = reload_on_plugin_change
+
+        self._num_workers_running = 0
+        self._num_ready_workers_running = 0
+        self._last_refresh_time = time.time() if worker_refresh_interval > 0 else None
+        self._last_plugin_state = self._generate_plugin_state() if reload_on_plugin_change else None
+        self._restart_on_next_plugin_check = False
+
+    def _generate_plugin_state(self) -> Dict[str, float]:
+        """
+        Generate dict of filenames and last modification time of all files in settings.PLUGINS_FOLDER
+        directory.
+        """
+        if not settings.PLUGINS_FOLDER:
+            return {}
+        all_filenames: List[str] = []
+        for (root, _, filenames) in os.walk(settings.PLUGINS_FOLDER):
+            # filenames: List[str] = cast(List[str], filenames)
+            all_filenames.extend(os.path.join(root, f) for f in filenames)
+        plugin_state = {f: os.path.getmtime(f) for f in sorted(all_filenames)}
+        return plugin_state
+
+    def _get_num_ready_workers_running(self) -> int:
+        """Returns number of ready Gunicorn workers by looking for READY_PREFIX in process name"""
+        workers = psutil.Process(self.gunicorn_master_proc.pid).children()
+
+        def ready_prefix_on_cmdline(proc):
+            try:
+                cmdline = proc.cmdline()
+                if len(cmdline) > 0:  # pylint: disable=len-as-condition
+                    return settings.GUNICORN_WORKER_READY_PREFIX in cmdline[0]
+            except psutil.NoSuchProcess:
+                pass
+            return False
+
+        ready_workers = [proc for proc in workers if ready_prefix_on_cmdline(proc)]
+        return len(ready_workers)
+
+    def _get_num_workers_running(self) -> int:
+        """Returns number of running Gunicorn workers processes"""
+        workers = psutil.Process(self.gunicorn_master_proc.pid).children()
+        return len(workers)
+
+    def _wait_until_true(self, fn, timeout: int = 0) -> None:
         """
         Sleeps until fn is true
         """
         start_time = time.time()
         while not fn():
             if 0 < timeout <= time.time() - start_time:
                 raise AirflowWebServerTimeout(
-                    "No response from gunicorn master within {0} seconds"
-                    .format(timeout))
-            time.sleep(0.1)
+                    "No response from gunicorn master within {0} seconds".format(timeout)
+                )
+            sleep(0.1)
 
-    def start_refresh(gunicorn_master_proc):
-        batch_size = conf.getint('webserver', 'worker_refresh_batch_size')
-        log.debug('%s doing a refresh of %s workers', state, batch_size)
-        sys.stdout.flush()
-        sys.stderr.flush()
+    def _spawn_new_workers(self, count: int) -> None:
+        """
+        Send signal to kill the worker.
 
+        :param count: The number of workers to spawn
+        """
         excess = 0
-        for _ in range(batch_size):
-            gunicorn_master_proc.send_signal(signal.SIGTTIN)
+        for _ in range(count):
+            # TTIN: Increment the number of processes by one
+            self.gunicorn_master_proc.send_signal(signal.SIGTTIN)
             excess += 1
-            wait_until_true(lambda: num_workers_expected + excess ==
-                            get_num_workers_running(gunicorn_master_proc),
-                            master_timeout)
-
-    try:  # pylint: disable=too-many-nested-blocks
-        wait_until_true(lambda: num_workers_expected ==
-                        get_num_workers_running(gunicorn_master_proc),
-                        master_timeout)
-        while True:
-            num_workers_running = get_num_workers_running(gunicorn_master_proc)
-            num_ready_workers_running = \
-                get_num_ready_workers_running(gunicorn_master_proc)
-
-            state = '[{0} / {1}]'.format(num_ready_workers_running, num_workers_running)
-
-            # Whenever some workers are not ready, wait until all workers are ready
-            if num_ready_workers_running < num_workers_running:
-                log.debug('%s some workers are starting up, waiting...', state)
-                sys.stdout.flush()
-                time.sleep(1)
-
-            # Kill a worker gracefully by asking gunicorn to reduce number of workers
-            elif num_workers_running > num_workers_expected:
-                excess = num_workers_running - num_workers_expected
-                log.debug('%s killing %s workers', state, excess)
-
-                for _ in range(excess):
-                    gunicorn_master_proc.send_signal(signal.SIGTTOU)
-                    excess -= 1
-                    wait_until_true(lambda: num_workers_expected + excess ==
-                                    get_num_workers_running(gunicorn_master_proc),
-                                    master_timeout)
-
-            # Start a new worker by asking gunicorn to increase number of workers
-            elif num_workers_running == num_workers_expected:
-                refresh_interval = conf.getint('webserver', 'worker_refresh_interval')
-                log.debug(
-                    '%s sleeping for %ss starting doing a refresh...',
-                    state, refresh_interval
+            self._wait_until_true(
+                lambda: self.num_workers_expected + excess == self._get_num_workers_running(),
+                timeout=self.master_timeout
+            )
+
+    def _kill_old_workers(self, count: int) -> None:
+        """
+        Send signal to kill the worker.
+
+        :param count: The number of workers to kill
+        """
+        for _ in range(count):
+            count -= 1
+            # TTOU: Decrement the number of processes by one
+            self.gunicorn_master_proc.send_signal(signal.SIGTTOU)
+            self._wait_until_true(
+                lambda: self.num_workers_expected + count == self._get_num_workers_running(),
+                timeout=self.master_timeout)
+
+    def _reload_gunicorn(self) -> None:
+        """"
+        Send signal to reload the gunciron configuration. When gunciorn receive signals, it reload the
+        configuration, start the new worker processes with a new configuration and gracefully
+        shutdown older workers.
+        """
+        # HUP: Reload the configuration.
+        self.gunicorn_master_proc.send_signal(signal.SIGHUP)
+        sleep(1)
+        self._wait_until_true(
+            lambda: self.num_workers_expected == self._get_num_workers_running(),
+            timeout=self.master_timeout
+        )
+
+    def start(self) -> NoReturn:
+        """
+        Starts monitoring the webserver.
+        """
+        try:  # pylint: disable=too-many-nested-blocks
+            self._wait_until_true(
+                lambda: self.num_workers_expected == self._get_num_workers_running(),
+                timeout=self.master_timeout
+            )
+            while True:
+                while self.gunicorn_master_proc.poll() is not None:
+                    sys.exit(self.gunicorn_master_proc.returncode)
+                self._check_workers()
+                # Throttle loop
+                sleep(1)
+
+        except (AirflowWebServerTimeout, OSError) as err:
+            self.log.error(err)
+            self.log.error("Shutting down webserver")
+            try:
+                self.gunicorn_master_proc.terminate()
+                self.gunicorn_master_proc.wait()
+            finally:
+                sys.exit(1)
+
+    def _check_workers(self) -> None:
+        num_workers_running = self._get_num_workers_running()
+        num_ready_workers_running = self._get_num_ready_workers_running()
+
+        # Whenever some workers are not ready, wait until all workers are ready
+        if num_ready_workers_running < num_workers_running:
+            self.log.debug(
+                '[%d / %d] Some workers are starting up, waiting...',
+                num_ready_workers_running, num_workers_running
+            )
+            sleep(1)
+            return
+
+        # If there are too many workers, then kill a worker gracefully by asking gunicorn to reduce
+        # number of workers
+        if num_workers_running > self.num_workers_expected:
+            excess = min(num_workers_running - self.num_workers_expected, self.worker_refresh_batch_size)
+            self.log.debug(
+                '[%d / %d] Killing %s workers', num_ready_workers_running, num_workers_running, excess
+            )
+            self._kill_old_workers(excess)
+            return
+
+        # If there are too few workers, start a new worker by asking gunicorn
+        # to increase number of workers
+        if num_workers_running < self.num_workers_expected:
+            self.log.error(
+                "[%d / %d] Some workers seem to have died and gunicorn did not restart "
+                "them as expected",
+                num_ready_workers_running, num_workers_running
+            )
+            sleep(10)
+            num_workers_running = self._get_num_workers_running()
+            if num_workers_running < self.num_workers_expected:
+                new_worker_count = min(
+                    num_workers_running - self.worker_refresh_batch_size, self.worker_refresh_batch_size
                 )
-                time.sleep(refresh_interval)
-                start_refresh(gunicorn_master_proc)
-
-            else:
-                # num_ready_workers_running == num_workers_running < num_workers_expected
-                log.error((
-                    "%s some workers seem to have died and gunicorn"
-                    "did not restart them as expected"
-                ), state)
-                time.sleep(10)
-                if len(
-                    psutil.Process(gunicorn_master_proc.pid).children()
-                ) < num_workers_expected:
-                    start_refresh(gunicorn_master_proc)
-    except (AirflowWebServerTimeout, OSError) as err:
-        log.error(err)
-        log.error("Shutting down webserver")
-        try:
-            gunicorn_master_proc.terminate()
-            gunicorn_master_proc.wait()
-        finally:
-            sys.exit(1)
+                self.log.debug(
+                    '[%d / %d] Spawning %d workers',
+                    num_ready_workers_running, num_workers_running, new_worker_count
+                )
+                self._spawn_new_workers(num_workers_running)
+            return
+
+        # Now the number of running and expected worker should be equal
+
+        # If workers should be restarted periodically.
+        if self.worker_refresh_interval > 0 and self._last_refresh_time:
+            # and we refreshed the workers a long time ago, refresh the workers
+            last_refresh_diff = (time.time() - self._last_refresh_time)
+            if self.worker_refresh_interval < last_refresh_diff:
+                num_new_workers = self.worker_refresh_batch_size
+                self.log.debug(
+                    '[%d / %d] Starting doing a refresh. Starting %d workers.',
+                    num_ready_workers_running, num_workers_running, num_new_workers
+                )
+                self._spawn_new_workers(num_new_workers)
+                self._last_refresh_time = time.time()
+                return
+
+        # if we should check the directory with the plugin,
+        if self.reload_on_plugin_change:
+            # compare the previous and current contents of the directory
+            new_state = self._generate_plugin_state()
+            # If changed, wait until its content is fully saved.
+            if new_state != self._last_plugin_state:

Review comment:
       I think we should check if _last_plugin_state was actually set before and not restart everything then otherwise we will pretty much always restart everything immediately after starting. That will make the startup time longer for webserver.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org