You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by da...@apache.org on 2016/08/08 18:27:10 UTC
incubator-airflow git commit: [AIRFLOW-276] Gunicorn rolling restart
Repository: incubator-airflow
Updated Branches:
refs/heads/master 62768bc08 -> 9d254a317
[AIRFLOW-276] Gunicorn rolling restart
- Tell gunicorn to prepend `[ready]` to worker process name once worker is ready (to serve requests) - in particular this happens after DAGs folder is parsed
- Airflow cli runs gunicorn as a child process instead of `excecvp`-ing over itself
- Airflow cli monitors gunicorn worker processes and restarts them by sending TTIN/TTOU signals to the gunicorn master process
- Fix bug where `conf.get('webserver', 'workers')` and `conf.get('webserver', 'webserver_worker_timeout')` were ignored
- Alternatively, https://github.com/apache/incubator-airflow/pull/1684/files does the same thing but the worker-restart script is provided separately for the user to run
- Start airflow, observe that workers are restarted
- Add new dags to dags folder and check that they show up
- Run `siege` against airflow while server is restarting and confirm that all requests succeed
- Run with configuration set to `batch_size = 0`, `batch_size = 1` and `batch_size = 4`
Closes #1685 from zodiac/xuanji_gunicorn_rolling_restart_2
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9d254a31
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9d254a31
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9d254a31
Branch: refs/heads/master
Commit: 9d254a317dd54f555270ca568aff1cd0500e1e53
Parents: 62768bc
Author: Li Xuanji <xu...@gmail.com>
Authored: Mon Aug 8 11:26:28 2016 -0700
Committer: Dan Davydov <da...@airbnb.com>
Committed: Mon Aug 8 11:26:38 2016 -0700
----------------------------------------------------------------------
airflow/bin/cli.py | 157 ++++++++++++++++++++++++++++++++----
airflow/configuration.py | 12 ++-
airflow/settings.py | 5 ++
airflow/www/app.py | 3 +
airflow/www/gunicorn_config.py | 23 ++++++
scripts/ci/requirements.txt | 1 +
6 files changed, 185 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9d254a31/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 0d35661..76777a0 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -33,16 +33,19 @@ import signal
import sys
import threading
import traceback
+import time
+import psutil
import airflow
from airflow import jobs, settings
from airflow import configuration as conf
+from airflow.exceptions import AirflowException
from airflow.executors import DEFAULT_EXECUTOR
from airflow.models import DagModel, DagBag, TaskInstance, DagPickle, DagRun, Variable
from airflow.utils import db as db_utils
from airflow.utils import logging as logging_utils
from airflow.utils.state import State
-from airflow.exceptions import AirflowException
+from airflow.www.app import cached_app
DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))
@@ -500,16 +503,127 @@ def clear(args):
include_subdags=not args.exclude_subdags)
+def restart_workers(gunicorn_master_proc, num_workers_expected):
+ """
+ Runs forever, monitoring the child processes of @gunicorn_master_proc and
+ restarting workers occasionally.
+
+ Each iteration of the loop traverses one edge of this state transition
+ diagram, where each state (node) represents
+ [ num_ready_workers_running / num_workers_running ]. We expect most time to
+ be spent in [n / n]. `bs` is the setting webserver.worker_refresh_batch_size.
+
+ The horizontal transition at ? happens after the new worker parses all the
+ dags (so it could take a while!)
+
+ V \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510
+ [n / n] \u2500\u2500TTIN\u2500\u2500> [ [n, n+bs) / n + bs ] \u2500\u2500\u2500\u2500?\u2500\u2500\u2500> [n + bs / n + bs] \u2500\u2500TTOU\u2500\u2518
+ ^ ^\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518
+ \u2502
+ \u2502 \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500v
+ \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2534\u2500\u2500\u2500\u2500\u2500\u2500 [ [0, n) / n ] <\u2500\u2500\u2500 start
+
+ We change the number of workers by sending TTIN and TTOU to the gunicorn
+ master process, which increases and decreases the number of child workers
+ respectively. Gunicorn guarantees that on TTOU workers are terminated
+ gracefully and that the oldest worker is terminated.
+ """
+
+ def wait_until_true(fn):
+ """
+ Sleeps until fn is true
+ """
+ while not fn():
+ time.sleep(0.1)
+
+ def get_num_workers_running(gunicorn_master_proc):
+ workers = psutil.Process(gunicorn_master_proc.pid).children()
+ return len(workers)
+
+ def get_num_ready_workers_running(gunicorn_master_proc):
+ workers = psutil.Process(gunicorn_master_proc.pid).children()
+ ready_workers = [
+ proc for proc in workers
+ if settings.GUNICORN_WORKER_READY_PREFIX in proc.cmdline()[0]
+ ]
+ return len(ready_workers)
+
+ def start_refresh(gunicorn_master_proc):
+ batch_size = conf.getint('webserver', 'worker_refresh_batch_size')
+ logging.debug('%s doing a refresh of %s workers',
+ state, batch_size)
+ sys.stdout.flush()
+ sys.stderr.flush()
+
+ excess = 0
+ for _ in range(batch_size):
+ gunicorn_master_proc.send_signal(signal.SIGTTIN)
+ excess += 1
+ wait_until_true(lambda: num_workers_expected + excess ==
+ get_num_workers_running(gunicorn_master_proc))
+
+
+ wait_until_true(lambda: num_workers_expected ==
+ get_num_workers_running(gunicorn_master_proc))
+
+ while True:
+
+ num_workers_running = get_num_workers_running(gunicorn_master_proc)
+ num_ready_workers_running = get_num_ready_workers_running(gunicorn_master_proc)
+
+ state = '[{0} / {1}]'.format(num_ready_workers_running, num_workers_running)
+
+ # Whenever some workers are not ready, wait until all workers are ready
+ if num_ready_workers_running < num_workers_running:
+ logging.debug('%s some workers are starting up, waiting...', state)
+ sys.stdout.flush()
+ time.sleep(1)
+
+ # Kill a worker gracefully by asking gunicorn to reduce number of workers
+ elif num_workers_running > num_workers_expected:
+ excess = num_workers_running - num_workers_expected
+ logging.debug('%s killing %s workers', state, excess)
+
+ for _ in range(excess):
+ gunicorn_master_proc.send_signal(signal.SIGTTOU)
+ excess -= 1
+ wait_until_true(lambda: num_workers_expected + excess ==
+ get_num_workers_running(gunicorn_master_proc))
+
+ # Start a new worker by asking gunicorn to increase number of workers
+ elif num_workers_running == num_workers_expected:
+ refresh_interval = conf.getint('webserver', 'worker_refresh_interval')
+ logging.debug(
+ '%s sleeping for %ss starting doing a refresh...',
+ state, refresh_interval
+ )
+ time.sleep(refresh_interval)
+ start_refresh(gunicorn_master_proc)
+
+ else:
+ # num_ready_workers_running == num_workers_running < num_workers_expected
+ logging.error((
+ "%s some workers seem to have died and gunicorn"
+ "did not restart them as expected"
+ ), state)
+ time.sleep(10)
+ if len(
+ psutil.Process(gunicorn_master_proc.pid).children()
+ ) < num_workers_expected:
+ start_refresh(gunicorn_master_proc)
+
+
def webserver(args):
+
print(settings.HEADER)
- from airflow.www.app import cached_app
app = cached_app(conf)
access_logfile = args.access_logfile or conf.get('webserver', 'access_logfile')
error_logfile = args.error_logfile or conf.get('webserver', 'error_logfile')
- workers = args.workers or conf.get('webserver', 'workers')
+ num_workers = args.workers or conf.get('webserver', 'workers')
worker_timeout = (args.worker_timeout or
conf.get('webserver', 'webserver_worker_timeout'))
+
if args.debug:
print(
"Starting the web server on port {0} and host {1}.".format(
@@ -520,7 +634,7 @@ def webserver(args):
print(
textwrap.dedent('''\
Running the Gunicorn Server with:
- Workers: {workers} {args.workerclass}
+ Workers: {num_workers} {args.workerclass}
Host: {args.hostname}:{args.port}
Timeout: {worker_timeout}
Logfiles: {access_logfile} {error_logfile}
@@ -529,12 +643,13 @@ def webserver(args):
run_args = [
'gunicorn',
- '-w ' + str(args.workers),
- '-k ' + str(args.workerclass),
- '-t ' + str(args.worker_timeout),
- '-b ' + args.hostname + ':' + str(args.port),
- '-n ' + 'airflow-webserver',
- '-p ' + str(pid),
+ '-w', str(num_workers),
+ '-k', str(args.workerclass),
+ '-t', str(worker_timeout),
+ '-b', args.hostname + ':' + str(args.port),
+ '-n', 'airflow-webserver',
+ '-p', str(pid),
+ '-c', 'airflow.www.gunicorn_config'
]
if args.access_logfile:
@@ -546,11 +661,23 @@ def webserver(args):
if args.daemon:
run_args += ["-D"]
- module = "airflow.www.app:cached_app()".encode()
- run_args += [module]
- os.execvp(
- 'gunicorn', run_args
- )
+ run_args += ["airflow.www.app:cached_app()"]
+
+ gunicorn_master_proc = subprocess.Popen(run_args)
+
+ def kill_proc(dummy_signum, dummy_frame):
+ gunicorn_master_proc.terminate()
+ gunicorn_master_proc.wait()
+ sys.exit(0)
+
+ signal.signal(signal.SIGINT, kill_proc)
+ signal.signal(signal.SIGTERM, kill_proc)
+
+ # These run forever until SIG{INT, TERM, KILL, ...} signal is sent
+ if conf.getint('webserver', 'worker_refresh_interval') > 0:
+ restart_workers(gunicorn_master_proc, num_workers)
+ else:
+ while True: time.sleep(1)
def scheduler(args):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9d254a31/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 9313366..31b8f36 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -121,6 +121,8 @@ defaults = {
'web_server_host': '0.0.0.0',
'web_server_port': '8080',
'web_server_worker_timeout': 120,
+ 'worker_refresh_batch_size': 1,
+ 'worker_refresh_interval': 30,
'authenticate': False,
'filter_by_owner': False,
'owner_mode': 'user',
@@ -282,9 +284,17 @@ web_server_host = 0.0.0.0
# The port on which to run the web server
web_server_port = 8080
-# The time the gunicorn webserver waits before timing out on a worker
+# Number of seconds the gunicorn webserver waits before timing out on a worker
web_server_worker_timeout = 120
+# Number of workers to refresh at a time. When set to 0, worker refresh is
+# disabled. When nonzero, airflow periodically refreshes webserver workers by
+# bringing up new ones and killing old ones.
+worker_refresh_batch_size = 1
+
+# Number of seconds to wait before refreshing a batch of workers.
+worker_refresh_interval = 30
+
# Secret key used to run your flask app
secret_key = temporary_key
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9d254a31/airflow/settings.py
----------------------------------------------------------------------
diff --git a/airflow/settings.py b/airflow/settings.py
index ccd77ee..35928ec 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -72,9 +72,14 @@ SQL_ALCHEMY_CONN = conf.get('core', 'SQL_ALCHEMY_CONN')
LOGGING_LEVEL = logging.INFO
DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))
+# the prefix to append to gunicorn worker processes after init
+GUNICORN_WORKER_READY_PREFIX = "[ready] "
+
# can't move this to conf due to ConfigParser interpolation
LOG_FORMAT = (
'[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s')
+LOG_FORMAT_WITH_PID = (
+ '[%(asctime)s] [%(process)d] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s')
LOG_FORMAT_WITH_THREAD_NAME = (
'[%(asctime)s] {%(filename)s:%(lineno)d} %(threadName)s %(levelname)s - %(message)s')
SIMPLE_LOG_FORMAT = '%(asctime)s %(levelname)s - %(message)s'
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9d254a31/airflow/www/app.py
----------------------------------------------------------------------
diff --git a/airflow/www/app.py b/airflow/www/app.py
index 3953511..10d8420 100644
--- a/airflow/www/app.py
+++ b/airflow/www/app.py
@@ -47,6 +47,9 @@ def create_app(config=None):
app.register_blueprint(routes)
+ log_format = airflow.settings.LOG_FORMAT_WITH_PID
+ airflow.settings.configure_logging(log_format=log_format)
+
with app.app_context():
from airflow.www import views
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9d254a31/airflow/www/gunicorn_config.py
----------------------------------------------------------------------
diff --git a/airflow/www/gunicorn_config.py b/airflow/www/gunicorn_config.py
new file mode 100644
index 0000000..e1fd9b6
--- /dev/null
+++ b/airflow/www/gunicorn_config.py
@@ -0,0 +1,23 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import setproctitle
+from airflow import settings
+
+
+def post_worker_init(dummy_worker):
+ setproctitle.setproctitle(
+ settings.GUNICORN_WORKER_READY_PREFIX + setproctitle.getproctitle()
+ )
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9d254a31/scripts/ci/requirements.txt
----------------------------------------------------------------------
diff --git a/scripts/ci/requirements.txt b/scripts/ci/requirements.txt
index 3317324..5d7b4bb 100644
--- a/scripts/ci/requirements.txt
+++ b/scripts/ci/requirements.txt
@@ -31,6 +31,7 @@ pyhive
pydruid
PyOpenSSL
PySmbClient
+psutil>=4.2.0, <5.0.0
psycopg2
python-dateutil
redis