You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/06/29 11:36:23 UTC

[GitHub] [airflow] mik-laj commented on a change in pull request #8997: Reload gunicorn when plugins has beeen changed

mik-laj commented on a change in pull request #8997:
URL: https://github.com/apache/airflow/pull/8997#discussion_r446901937



##########
File path: airflow/cli/commands/webserver_command.py
##########
@@ -83,92 +62,238 @@ def restart_workers(gunicorn_master_proc, num_workers_expected, master_timeout):
     master process, which increases and decreases the number of child workers
     respectively. Gunicorn guarantees that on TTOU workers are terminated
     gracefully and that the oldest worker is terminated.
-    """
 
-    def wait_until_true(fn, timeout=0):
+    :param gunicorn_master_proc:  handle for the main Gunicorn process
+    :param num_workers_expected:  Number of workers to run the Gunicorn web server
+    :param master_timeout: Number of seconds the webserver waits before killing gunicorn master that
+        doesn't respond
+    :param worker_refresh_interval: Number of seconds to wait before refreshing a batch of workers.
+    :param worker_refresh_batch_size: Number of workers to refresh at a time. When set to 0, worker
+        refresh is disabled. When nonzero, airflow periodically refreshes webserver workers by
+        bringing up new ones and killing old ones.
+    :param reload_on_plugin_change: If set to True, Airflow will track files in plugins_follder directory.
+        When it detects changes, then reload the gunicorn.
+    """
+    def __init__(
+        self,
+        gunicorn_master_proc: psutil.Process,
+        num_workers_expected: int,
+        master_timeout: int,
+        worker_refresh_interval: int,
+        worker_refresh_batch_size: int,
+        reload_on_plugin_change: bool
+    ):
+        super(GunicornMonitor, self).__init__()
+        self.gunicorn_master_proc = gunicorn_master_proc
+        self.num_workers_expected = num_workers_expected
+        self.master_timeout = master_timeout
+        self.worker_refresh_interval = worker_refresh_interval
+        self.worker_refresh_batch_size = worker_refresh_batch_size
+        self.reload_on_plugin_change = reload_on_plugin_change
+
+        self._num_workers_running = 0
+        self._num_ready_workers_running = 0
+        self._last_refresh_time = time.time() if worker_refresh_interval > 0 else None
+        self._last_plugin_state = self._generate_plugin_state() if reload_on_plugin_change else None
+        self._restart_on_next_plugin_check = False
+
+    def _generate_plugin_state(self) -> Dict[str, float]:
+        """
+        Generate dict of filenames and last modification time of all files in settings.PLUGINS_FOLDER
+        directory.
+        """
+        if not settings.PLUGINS_FOLDER:
+            return {}
+        all_filenames: List[str] = []
+        for (root, _, filenames) in os.walk(settings.PLUGINS_FOLDER):
+            # filenames: List[str] = cast(List[str], filenames)
+            all_filenames.extend(os.path.join(root, f) for f in filenames)
+        plugin_state = {f: os.path.getmtime(f) for f in sorted(all_filenames)}
+        return plugin_state
+
+    def _get_num_ready_workers_running(self) -> int:
+        """Returns number of ready Gunicorn workers by looking for READY_PREFIX in process name"""
+        workers = psutil.Process(self.gunicorn_master_proc.pid).children()
+
+        def ready_prefix_on_cmdline(proc):
+            try:
+                cmdline = proc.cmdline()
+                if len(cmdline) > 0:  # pylint: disable=len-as-condition
+                    return settings.GUNICORN_WORKER_READY_PREFIX in cmdline[0]
+            except psutil.NoSuchProcess:
+                pass
+            return False
+
+        ready_workers = [proc for proc in workers if ready_prefix_on_cmdline(proc)]
+        return len(ready_workers)
+
+    def _get_num_workers_running(self) -> int:
+        """Returns number of running Gunicorn workers processes"""
+        workers = psutil.Process(self.gunicorn_master_proc.pid).children()
+        return len(workers)
+
+    def _wait_until_true(self, fn, timeout: int = 0) -> None:
         """
         Sleeps until fn is true
         """
         start_time = time.time()
         while not fn():
             if 0 < timeout <= time.time() - start_time:
                 raise AirflowWebServerTimeout(
-                    "No response from gunicorn master within {0} seconds"
-                    .format(timeout))
-            time.sleep(0.1)
+                    "No response from gunicorn master within {0} seconds".format(timeout)
+                )
+            sleep(0.1)
 
-    def start_refresh(gunicorn_master_proc):
-        batch_size = conf.getint('webserver', 'worker_refresh_batch_size')
-        log.debug('%s doing a refresh of %s workers', state, batch_size)
-        sys.stdout.flush()
-        sys.stderr.flush()
+    def _spawn_new_workers(self, count: int) -> None:
+        """
+        Send signal to kill the worker.
 
+        :param count: The number of workers to spawn
+        """
         excess = 0
-        for _ in range(batch_size):
-            gunicorn_master_proc.send_signal(signal.SIGTTIN)
+        for _ in range(count):
+            # TTIN: Increment the number of processes by one
+            self.gunicorn_master_proc.send_signal(signal.SIGTTIN)
             excess += 1
-            wait_until_true(lambda: num_workers_expected + excess ==
-                            get_num_workers_running(gunicorn_master_proc),
-                            master_timeout)
-
-    try:  # pylint: disable=too-many-nested-blocks
-        wait_until_true(lambda: num_workers_expected ==
-                        get_num_workers_running(gunicorn_master_proc),
-                        master_timeout)
-        while True:
-            num_workers_running = get_num_workers_running(gunicorn_master_proc)
-            num_ready_workers_running = \
-                get_num_ready_workers_running(gunicorn_master_proc)
-
-            state = '[{0} / {1}]'.format(num_ready_workers_running, num_workers_running)
-
-            # Whenever some workers are not ready, wait until all workers are ready
-            if num_ready_workers_running < num_workers_running:
-                log.debug('%s some workers are starting up, waiting...', state)
-                sys.stdout.flush()
-                time.sleep(1)
-
-            # Kill a worker gracefully by asking gunicorn to reduce number of workers
-            elif num_workers_running > num_workers_expected:
-                excess = num_workers_running - num_workers_expected
-                log.debug('%s killing %s workers', state, excess)
-
-                for _ in range(excess):
-                    gunicorn_master_proc.send_signal(signal.SIGTTOU)
-                    excess -= 1
-                    wait_until_true(lambda: num_workers_expected + excess ==
-                                    get_num_workers_running(gunicorn_master_proc),
-                                    master_timeout)
-
-            # Start a new worker by asking gunicorn to increase number of workers
-            elif num_workers_running == num_workers_expected:
-                refresh_interval = conf.getint('webserver', 'worker_refresh_interval')
-                log.debug(
-                    '%s sleeping for %ss starting doing a refresh...',
-                    state, refresh_interval
+            self._wait_until_true(
+                lambda: self.num_workers_expected + excess == self._get_num_workers_running(),
+                timeout=self.master_timeout
+            )
+
+    def _kill_old_workers(self, count: int) -> None:
+        """
+        Send signal to kill the worker.
+
+        :param count: The number of workers to kill
+        """
+        for _ in range(count):
+            count -= 1
+            # TTOU: Decrement the number of processes by one
+            self.gunicorn_master_proc.send_signal(signal.SIGTTOU)
+            self._wait_until_true(
+                lambda: self.num_workers_expected + count == self._get_num_workers_running(),
+                timeout=self.master_timeout)
+
+    def _reload_gunicorn(self) -> None:
+        """"
+        Send signal to reload the gunciron configuration. When gunciorn receive signals, it reload the
+        configuration, start the new worker processes with a new configuration and gracefully
+        shutdown older workers.
+        """
+        # HUP: Reload the configuration.
+        self.gunicorn_master_proc.send_signal(signal.SIGHUP)
+        sleep(1)
+        self._wait_until_true(
+            lambda: self.num_workers_expected == self._get_num_workers_running(),
+            timeout=self.master_timeout
+        )
+
+    def start(self) -> NoReturn:
+        """
+        Starts monitoring the webserver.
+        """
+        try:  # pylint: disable=too-many-nested-blocks
+            self._wait_until_true(
+                lambda: self.num_workers_expected == self._get_num_workers_running(),
+                timeout=self.master_timeout
+            )
+            while True:
+                while self.gunicorn_master_proc.poll() is not None:
+                    sys.exit(self.gunicorn_master_proc.returncode)
+                self._check_workers()
+                # Throttle loop
+                sleep(1)
+
+        except (AirflowWebServerTimeout, OSError) as err:
+            self.log.error(err)
+            self.log.error("Shutting down webserver")
+            try:
+                self.gunicorn_master_proc.terminate()
+                self.gunicorn_master_proc.wait()
+            finally:
+                sys.exit(1)
+
+    def _check_workers(self) -> None:
+        num_workers_running = self._get_num_workers_running()
+        num_ready_workers_running = self._get_num_ready_workers_running()
+
+        # Whenever some workers are not ready, wait until all workers are ready
+        if num_ready_workers_running < num_workers_running:
+            self.log.debug(
+                '[%d / %d] Some workers are starting up, waiting...',
+                num_ready_workers_running, num_workers_running
+            )
+            sleep(1)
+            return
+
+        # If there are too many workers, then kill a worker gracefully by asking gunicorn to reduce
+        # number of workers
+        if num_workers_running > self.num_workers_expected:
+            excess = min(num_workers_running - self.num_workers_expected, self.worker_refresh_batch_size)
+            self.log.debug(
+                '[%d / %d] Killing %s workers', num_ready_workers_running, num_workers_running, excess
+            )
+            self._kill_old_workers(excess)
+            return
+
+        # If there are too few workers, start a new worker by asking gunicorn
+        # to increase number of workers
+        if num_workers_running < self.num_workers_expected:
+            self.log.error(
+                "[%d / %d] Some workers seem to have died and gunicorn did not restart "
+                "them as expected",
+                num_ready_workers_running, num_workers_running
+            )
+            sleep(10)
+            num_workers_running = self._get_num_workers_running()
+            if num_workers_running < self.num_workers_expected:
+                new_worker_count = min(
+                    num_workers_running - self.worker_refresh_batch_size, self.worker_refresh_batch_size
                 )
-                time.sleep(refresh_interval)
-                start_refresh(gunicorn_master_proc)
-
-            else:
-                # num_ready_workers_running == num_workers_running < num_workers_expected
-                log.error((
-                    "%s some workers seem to have died and gunicorn"
-                    "did not restart them as expected"
-                ), state)
-                time.sleep(10)
-                if len(
-                    psutil.Process(gunicorn_master_proc.pid).children()
-                ) < num_workers_expected:
-                    start_refresh(gunicorn_master_proc)
-    except (AirflowWebServerTimeout, OSError) as err:
-        log.error(err)
-        log.error("Shutting down webserver")
-        try:
-            gunicorn_master_proc.terminate()
-            gunicorn_master_proc.wait()
-        finally:
-            sys.exit(1)
+                self.log.debug(
+                    '[%d / %d] Spawning %d workers',
+                    num_ready_workers_running, num_workers_running, new_worker_count
+                )
+                self._spawn_new_workers(num_workers_running)
+            return
+
+        # Now the number of running and expected worker should be equal
+
+        # If workers should be restarted periodically.
+        if self.worker_refresh_interval > 0 and self._last_refresh_time:
+            # and we refreshed the workers a long time ago, refresh the workers
+            last_refresh_diff = (time.time() - self._last_refresh_time)
+            if self.worker_refresh_interval < last_refresh_diff:
+                num_new_workers = self.worker_refresh_batch_size
+                self.log.debug(
+                    '[%d / %d] Starting doing a refresh. Starting %d workers.',
+                    num_ready_workers_running, num_workers_running, num_new_workers
+                )
+                self._spawn_new_workers(num_new_workers)
+                self._last_refresh_time = time.time()
+                return
+
+        # if we should check the directory with the plugin,
+        if self.reload_on_plugin_change:
+            # compare the previous and current contents of the directory
+            new_state = self._generate_plugin_state()
+            # If changed, wait until its content is fully saved.
+            if new_state != self._last_plugin_state:

Review comment:
       The value for field _last_plugin_state is always set in the constructor when observing changes in the plugin directory  is enabled.
   https://github.com/apache/airflow/pull/8997/files#diff-71e5f1bb6f81ad283bca26aad652ae12R97
   ```
   self._last_plugin_state = self._generate_plugin_state() if reload_on_plugin_change else None
   ```
   It will not affect performance either, because it is run in a separate process.  We first run gunicorn as a separate process, and then we begin to observe workers. Once all workers are up, we start to observe the plugins directory and regularly restart workers.  In the meantime, when the workers start, we calculate the initial state of the directory.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org