You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by da...@apache.org on 2016/08/08 18:27:10 UTC

incubator-airflow git commit: [AIRFLOW-276] Gunicorn rolling restart

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 62768bc08 -> 9d254a317


[AIRFLOW-276] Gunicorn rolling restart

- Tell gunicorn to prepend `[ready]` to worker process name once worker is ready (to serve requests) - in particular this happens after DAGs folder is parsed
- Airflow cli runs gunicorn as a child process instead of `excecvp`-ing over itself
- Airflow cli monitors gunicorn worker processes and restarts them by sending TTIN/TTOU signals to the gunicorn master process
- Fix bug where `conf.get('webserver', 'workers')` and `conf.get('webserver', 'webserver_worker_timeout')` were ignored

- Alternatively, https://github.com/apache/incubator-airflow/pull/1684/files does the same thing but the worker-restart script is provided separately for the user to run

- Start airflow, observe that workers are restarted
- Add new dags to dags folder and check that they show up
- Run `siege` against airflow while server is restarting and confirm that all requests succeed
- Run with configuration set to `batch_size = 0`, `batch_size = 1` and `batch_size = 4`

Closes #1685 from zodiac/xuanji_gunicorn_rolling_restart_2


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9d254a31
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9d254a31
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9d254a31

Branch: refs/heads/master
Commit: 9d254a317dd54f555270ca568aff1cd0500e1e53
Parents: 62768bc
Author: Li Xuanji <xu...@gmail.com>
Authored: Mon Aug 8 11:26:28 2016 -0700
Committer: Dan Davydov <da...@airbnb.com>
Committed: Mon Aug 8 11:26:38 2016 -0700

----------------------------------------------------------------------
 airflow/bin/cli.py             | 157 ++++++++++++++++++++++++++++++++----
 airflow/configuration.py       |  12 ++-
 airflow/settings.py            |   5 ++
 airflow/www/app.py             |   3 +
 airflow/www/gunicorn_config.py |  23 ++++++
 scripts/ci/requirements.txt    |   1 +
 6 files changed, 185 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9d254a31/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 0d35661..76777a0 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -33,16 +33,19 @@ import signal
 import sys
 import threading
 import traceback
+import time
+import psutil
 
 import airflow
 from airflow import jobs, settings
 from airflow import configuration as conf
+from airflow.exceptions import AirflowException
 from airflow.executors import DEFAULT_EXECUTOR
 from airflow.models import DagModel, DagBag, TaskInstance, DagPickle, DagRun, Variable
 from airflow.utils import db as db_utils
 from airflow.utils import logging as logging_utils
 from airflow.utils.state import State
-from airflow.exceptions import AirflowException
+from airflow.www.app import cached_app
 
 DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))
 
@@ -500,16 +503,127 @@ def clear(args):
         include_subdags=not args.exclude_subdags)
 
 
+def restart_workers(gunicorn_master_proc, num_workers_expected):
+    """
+    Runs forever, monitoring the child processes of @gunicorn_master_proc and
+    restarting workers occasionally.
+
+    Each iteration of the loop traverses one edge of this state transition
+    diagram, where each state (node) represents
+    [ num_ready_workers_running / num_workers_running ]. We expect most time to
+    be spent in [n / n]. `bs` is the setting webserver.worker_refresh_batch_size.
+
+    The horizontal transition at ? happens after the new worker parses all the
+    dags (so it could take a while!)
+
+       V \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+    [n / n] \u2500\u2500TTIN\u2500\u2500> [ [n, n+bs) / n + bs ]  \u2500\u2500\u2500\u2500?\u2500\u2500\u2500> [n + bs / n + bs] \u2500\u2500TTOU\u2500\u2518
+       ^                          ^\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+       \u2502
+       \u2502      \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500v
+       \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500 [ [0, n) / n ] <\u2500\u2500\u2500 start
+
+    We change the number of workers by sending TTIN and TTOU to the gunicorn
+    master process, which increases and decreases the number of child workers
+    respectively. Gunicorn guarantees that on TTOU workers are terminated
+    gracefully and that the oldest worker is terminated.
+    """
+
+    def wait_until_true(fn):
+        """
+        Sleeps until fn is true
+        """
+        while not fn():
+            time.sleep(0.1)
+
+    def get_num_workers_running(gunicorn_master_proc):
+        workers = psutil.Process(gunicorn_master_proc.pid).children()
+        return len(workers)
+
+    def get_num_ready_workers_running(gunicorn_master_proc):
+        workers = psutil.Process(gunicorn_master_proc.pid).children()
+        ready_workers = [
+            proc for proc in workers
+            if settings.GUNICORN_WORKER_READY_PREFIX in proc.cmdline()[0]
+        ]
+        return len(ready_workers)
+
+    def start_refresh(gunicorn_master_proc):
+        batch_size = conf.getint('webserver', 'worker_refresh_batch_size')
+        logging.debug('%s doing a refresh of %s workers',
+            state, batch_size)
+        sys.stdout.flush()
+        sys.stderr.flush()
+
+        excess = 0
+        for _ in range(batch_size):
+            gunicorn_master_proc.send_signal(signal.SIGTTIN)
+            excess += 1
+            wait_until_true(lambda: num_workers_expected + excess ==
+                get_num_workers_running(gunicorn_master_proc))
+
+
+    wait_until_true(lambda: num_workers_expected ==
+        get_num_workers_running(gunicorn_master_proc))
+
+    while True:
+
+        num_workers_running = get_num_workers_running(gunicorn_master_proc)
+        num_ready_workers_running = get_num_ready_workers_running(gunicorn_master_proc)
+
+        state = '[{0} / {1}]'.format(num_ready_workers_running, num_workers_running)
+
+        # Whenever some workers are not ready, wait until all workers are ready
+        if num_ready_workers_running < num_workers_running:
+            logging.debug('%s some workers are starting up, waiting...', state)
+            sys.stdout.flush()
+            time.sleep(1)
+
+        # Kill a worker gracefully by asking gunicorn to reduce number of workers
+        elif num_workers_running > num_workers_expected:
+            excess = num_workers_running - num_workers_expected
+            logging.debug('%s killing %s workers', state, excess)
+
+            for _ in range(excess):
+                gunicorn_master_proc.send_signal(signal.SIGTTOU)
+                excess -= 1
+                wait_until_true(lambda: num_workers_expected + excess ==
+                    get_num_workers_running(gunicorn_master_proc))
+
+        # Start a new worker by asking gunicorn to increase number of workers
+        elif num_workers_running == num_workers_expected:
+            refresh_interval = conf.getint('webserver', 'worker_refresh_interval')
+            logging.debug(
+                '%s sleeping for %ss starting doing a refresh...',
+                state, refresh_interval
+            )
+            time.sleep(refresh_interval)
+            start_refresh(gunicorn_master_proc)
+
+        else:
+            # num_ready_workers_running == num_workers_running < num_workers_expected
+            logging.error((
+                "%s some workers seem to have died and gunicorn"
+                "did not restart them as expected"
+            ), state)
+            time.sleep(10)
+            if len(
+                psutil.Process(gunicorn_master_proc.pid).children()
+            ) < num_workers_expected:
+                start_refresh(gunicorn_master_proc)
+
+
 def webserver(args):
+
     print(settings.HEADER)
 
-    from airflow.www.app import cached_app
     app = cached_app(conf)
     access_logfile = args.access_logfile or conf.get('webserver', 'access_logfile')
     error_logfile = args.error_logfile or conf.get('webserver', 'error_logfile')
-    workers = args.workers or conf.get('webserver', 'workers')
+    num_workers = args.workers or conf.get('webserver', 'workers')
     worker_timeout = (args.worker_timeout or
                       conf.get('webserver', 'webserver_worker_timeout'))
+
     if args.debug:
         print(
             "Starting the web server on port {0} and host {1}.".format(
@@ -520,7 +634,7 @@ def webserver(args):
         print(
             textwrap.dedent('''\
                 Running the Gunicorn Server with:
-                Workers: {workers} {args.workerclass}
+                Workers: {num_workers} {args.workerclass}
                 Host: {args.hostname}:{args.port}
                 Timeout: {worker_timeout}
                 Logfiles: {access_logfile} {error_logfile}
@@ -529,12 +643,13 @@ def webserver(args):
 
         run_args = [
             'gunicorn',
-            '-w ' + str(args.workers),
-            '-k ' + str(args.workerclass),
-            '-t ' + str(args.worker_timeout),
-            '-b ' + args.hostname + ':' + str(args.port),
-            '-n ' + 'airflow-webserver',
-            '-p ' + str(pid),
+            '-w', str(num_workers),
+            '-k', str(args.workerclass),
+            '-t', str(worker_timeout),
+            '-b', args.hostname + ':' + str(args.port),
+            '-n', 'airflow-webserver',
+            '-p', str(pid),
+            '-c', 'airflow.www.gunicorn_config'
         ]
 
         if args.access_logfile:
@@ -546,11 +661,23 @@ def webserver(args):
         if args.daemon:
             run_args += ["-D"]
 
-        module = "airflow.www.app:cached_app()".encode()
-        run_args += [module]
-        os.execvp(
-            'gunicorn', run_args
-        )
+        run_args += ["airflow.www.app:cached_app()"]
+
+        gunicorn_master_proc = subprocess.Popen(run_args)
+
+        def kill_proc(dummy_signum, dummy_frame):
+            gunicorn_master_proc.terminate()
+            gunicorn_master_proc.wait()
+            sys.exit(0)
+
+        signal.signal(signal.SIGINT, kill_proc)
+        signal.signal(signal.SIGTERM, kill_proc)
+
+        # These run forever until SIG{INT, TERM, KILL, ...} signal is sent
+        if conf.getint('webserver', 'worker_refresh_interval') > 0:
+            restart_workers(gunicorn_master_proc, num_workers)
+        else:
+            while True: time.sleep(1)
 
 
 def scheduler(args):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9d254a31/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 9313366..31b8f36 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -121,6 +121,8 @@ defaults = {
         'web_server_host': '0.0.0.0',
         'web_server_port': '8080',
         'web_server_worker_timeout': 120,
+        'worker_refresh_batch_size': 1,
+        'worker_refresh_interval': 30,
         'authenticate': False,
         'filter_by_owner': False,
         'owner_mode': 'user',
@@ -282,9 +284,17 @@ web_server_host = 0.0.0.0
 # The port on which to run the web server
 web_server_port = 8080
 
-# The time the gunicorn webserver waits before timing out on a worker
+# Number of seconds the gunicorn webserver waits before timing out on a worker
 web_server_worker_timeout = 120
 
+# Number of workers to refresh at a time. When set to 0, worker refresh is
+# disabled. When nonzero, airflow periodically refreshes webserver workers by
+# bringing up new ones and killing old ones.
+worker_refresh_batch_size = 1
+
+# Number of seconds to wait before refreshing a batch of workers.
+worker_refresh_interval = 30
+
 # Secret key used to run your flask app
 secret_key = temporary_key
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9d254a31/airflow/settings.py
----------------------------------------------------------------------
diff --git a/airflow/settings.py b/airflow/settings.py
index ccd77ee..35928ec 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -72,9 +72,14 @@ SQL_ALCHEMY_CONN = conf.get('core', 'SQL_ALCHEMY_CONN')
 LOGGING_LEVEL = logging.INFO
 DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))
 
+# the prefix to append to gunicorn worker processes after init
+GUNICORN_WORKER_READY_PREFIX = "[ready] "
+
 # can't move this to conf due to ConfigParser interpolation
 LOG_FORMAT = (
     '[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s')
+LOG_FORMAT_WITH_PID = (
+    '[%(asctime)s] [%(process)d] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s')
 LOG_FORMAT_WITH_THREAD_NAME = (
     '[%(asctime)s] {%(filename)s:%(lineno)d} %(threadName)s %(levelname)s - %(message)s')
 SIMPLE_LOG_FORMAT = '%(asctime)s %(levelname)s - %(message)s'

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9d254a31/airflow/www/app.py
----------------------------------------------------------------------
diff --git a/airflow/www/app.py b/airflow/www/app.py
index 3953511..10d8420 100644
--- a/airflow/www/app.py
+++ b/airflow/www/app.py
@@ -47,6 +47,9 @@ def create_app(config=None):
 
     app.register_blueprint(routes)
 
+    log_format = airflow.settings.LOG_FORMAT_WITH_PID
+    airflow.settings.configure_logging(log_format=log_format)
+
     with app.app_context():
         from airflow.www import views
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9d254a31/airflow/www/gunicorn_config.py
----------------------------------------------------------------------
diff --git a/airflow/www/gunicorn_config.py b/airflow/www/gunicorn_config.py
new file mode 100644
index 0000000..e1fd9b6
--- /dev/null
+++ b/airflow/www/gunicorn_config.py
@@ -0,0 +1,23 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import setproctitle
+from airflow import settings
+
+
+def post_worker_init(dummy_worker):
+    setproctitle.setproctitle(
+        settings.GUNICORN_WORKER_READY_PREFIX + setproctitle.getproctitle()
+    )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9d254a31/scripts/ci/requirements.txt
----------------------------------------------------------------------
diff --git a/scripts/ci/requirements.txt b/scripts/ci/requirements.txt
index 3317324..5d7b4bb 100644
--- a/scripts/ci/requirements.txt
+++ b/scripts/ci/requirements.txt
@@ -31,6 +31,7 @@ pyhive
 pydruid
 PyOpenSSL
 PySmbClient
+psutil>=4.2.0, <5.0.0
 psycopg2
 python-dateutil
 redis