You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ar...@apache.org on 2018/03/22 18:51:00 UTC
incubator-airflow git commit: [AIRFLOW-1235] Fix webserver's odd
behaviour
Repository: incubator-airflow
Updated Branches:
refs/heads/master 8c42d03c4 -> 7e762d42d
[AIRFLOW-1235] Fix webserver's odd behaviour
In some cases, the gunicorn master shuts down
but the webserver monitor process doesn't.
This PR add timeout functionality to shutdown
all related processes in such cases.
Dear Airflow maintainers,
Please accept this PR. I understand that it will
not be reviewed until I have checked off all the
steps below!
### JIRA
- [x] My PR addresses the following [Airflow JIRA]
(https://issues.apache.org/jira/browse/AIRFLOW/)
issues and references them in the PR title. For
example, "[AIRFLOW-XXX] My Airflow PR"
-
https://issues.apache.org/jira/browse/AIRFLOW-1235
### Description
- [x] Here are some details about my PR, including
screenshots of any UI changes:
In some cases, the gunicorn master shuts down
but the webserver monitor process doesn't.
This PR add timeout functionality to shutdown
all related processes in such cases.
### Tests
- [x] My PR adds the following unit tests __OR__
does not need testing for this extremely good
reason:
tests.core:CliTests.test_cli_webserver_shutdown_wh
en_gunicorn_master_is_killed
### Commits
- [x] My commits all reference JIRA issues in
their subject lines, and I have squashed multiple
commits if they address the same issue. In
addition, my commits follow the guidelines from
"[How to write a good git commit
message](http://chris.beams.io/posts/git-
commit/)":
1. Subject is separated from body by a blank line
2. Subject is limited to 50 characters
3. Subject does not end with a period
4. Subject uses the imperative mood ("add", not
"adding")
5. Body wraps at 72 characters
6. Body explains "what" and "why", not "how"
Closes #2330 from sekikn/AIRFLOW-1235
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7e762d42
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7e762d42
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7e762d42
Branch: refs/heads/master
Commit: 7e762d42df50d84e4740e15c24594c50aaab53a2
Parents: 8c42d03
Author: Kengo Seki <se...@apache.org>
Authored: Thu Mar 22 11:50:27 2018 -0700
Committer: Arthur Wiedmer <aw...@netflix.com>
Committed: Thu Mar 22 11:50:27 2018 -0700
----------------------------------------------------------------------
airflow/bin/cli.py | 129 +++++++++++++---------
airflow/config_templates/default_airflow.cfg | 3 +
airflow/exceptions.py | 4 +
tests/core.py | 10 ++
4 files changed, 91 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7e762d42/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 449d8ca..1801cc7 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -46,7 +46,7 @@ import airflow
from airflow import api
from airflow import jobs, settings
from airflow import configuration as conf
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, AirflowWebServerTimeout
from airflow.executors import GetDefaultExecutor
from airflow.models import (DagModel, DagBag, TaskInstance,
DagPickle, DagRun, Variable, DagStat,
@@ -592,7 +592,12 @@ def get_num_ready_workers_running(gunicorn_master_proc):
return len(ready_workers)
-def restart_workers(gunicorn_master_proc, num_workers_expected):
+def get_num_workers_running(gunicorn_master_proc):
+ workers = psutil.Process(gunicorn_master_proc.pid).children()
+ return len(workers)
+
+
+def restart_workers(gunicorn_master_proc, num_workers_expected, master_timeout):
"""
Runs forever, monitoring the child processes of @gunicorn_master_proc and
restarting workers occasionally.
@@ -618,17 +623,18 @@ def restart_workers(gunicorn_master_proc, num_workers_expected):
gracefully and that the oldest worker is terminated.
"""
- def wait_until_true(fn):
+ def wait_until_true(fn, timeout=0):
"""
Sleeps until fn is true
"""
+ t = time.time()
while not fn():
+ if 0 < timeout and timeout <= time.time() - t:
+ raise AirflowWebServerTimeout(
+ "No response from gunicorn master within {0} seconds"
+ .format(timeout))
time.sleep(0.1)
- def get_num_workers_running(gunicorn_master_proc):
- workers = psutil.Process(gunicorn_master_proc.pid).children()
- return len(workers)
-
def start_refresh(gunicorn_master_proc):
batch_size = conf.getint('webserver', 'worker_refresh_batch_size')
log.debug('%s doing a refresh of %s workers', state, batch_size)
@@ -640,56 +646,68 @@ def restart_workers(gunicorn_master_proc, num_workers_expected):
gunicorn_master_proc.send_signal(signal.SIGTTIN)
excess += 1
wait_until_true(lambda: num_workers_expected + excess ==
- get_num_workers_running(gunicorn_master_proc))
-
- wait_until_true(lambda: num_workers_expected ==
- get_num_workers_running(gunicorn_master_proc))
-
- while True:
- num_workers_running = get_num_workers_running(gunicorn_master_proc)
- num_ready_workers_running = get_num_ready_workers_running(gunicorn_master_proc)
-
- state = '[{0} / {1}]'.format(num_ready_workers_running, num_workers_running)
-
- # Whenever some workers are not ready, wait until all workers are ready
- if num_ready_workers_running < num_workers_running:
- log.debug('%s some workers are starting up, waiting...', state)
- sys.stdout.flush()
- time.sleep(1)
-
- # Kill a worker gracefully by asking gunicorn to reduce number of workers
- elif num_workers_running > num_workers_expected:
- excess = num_workers_running - num_workers_expected
- log.debug('%s killing %s workers', state, excess)
-
- for _ in range(excess):
- gunicorn_master_proc.send_signal(signal.SIGTTOU)
- excess -= 1
- wait_until_true(lambda: num_workers_expected + excess ==
- get_num_workers_running(gunicorn_master_proc))
-
- # Start a new worker by asking gunicorn to increase number of workers
- elif num_workers_running == num_workers_expected:
- refresh_interval = conf.getint('webserver', 'worker_refresh_interval')
- log.debug(
- '%s sleeping for %ss starting doing a refresh...',
- state, refresh_interval
- )
- time.sleep(refresh_interval)
- start_refresh(gunicorn_master_proc)
+ get_num_workers_running(gunicorn_master_proc),
+ master_timeout)
- else:
- # num_ready_workers_running == num_workers_running < num_workers_expected
- log.error((
- "%s some workers seem to have died and gunicorn"
- "did not restart them as expected"
- ), state)
- time.sleep(10)
- if len(
- psutil.Process(gunicorn_master_proc.pid).children()
- ) < num_workers_expected:
+ try:
+ wait_until_true(lambda: num_workers_expected ==
+ get_num_workers_running(gunicorn_master_proc),
+ master_timeout)
+ while True:
+ num_workers_running = get_num_workers_running(gunicorn_master_proc)
+ num_ready_workers_running = \
+ get_num_ready_workers_running(gunicorn_master_proc)
+
+ state = '[{0} / {1}]'.format(num_ready_workers_running, num_workers_running)
+
+ # Whenever some workers are not ready, wait until all workers are ready
+ if num_ready_workers_running < num_workers_running:
+ log.debug('%s some workers are starting up, waiting...', state)
+ sys.stdout.flush()
+ time.sleep(1)
+
+ # Kill a worker gracefully by asking gunicorn to reduce number of workers
+ elif num_workers_running > num_workers_expected:
+ excess = num_workers_running - num_workers_expected
+ log.debug('%s killing %s workers', state, excess)
+
+ for _ in range(excess):
+ gunicorn_master_proc.send_signal(signal.SIGTTOU)
+ excess -= 1
+ wait_until_true(lambda: num_workers_expected + excess ==
+ get_num_workers_running(gunicorn_master_proc),
+ master_timeout)
+
+ # Start a new worker by asking gunicorn to increase number of workers
+ elif num_workers_running == num_workers_expected:
+ refresh_interval = conf.getint('webserver', 'worker_refresh_interval')
+ log.debug(
+ '%s sleeping for %ss starting doing a refresh...',
+ state, refresh_interval
+ )
+ time.sleep(refresh_interval)
start_refresh(gunicorn_master_proc)
+ else:
+ # num_ready_workers_running == num_workers_running < num_workers_expected
+ log.error((
+ "%s some workers seem to have died and gunicorn"
+ "did not restart them as expected"
+ ), state)
+ time.sleep(10)
+ if len(
+ psutil.Process(gunicorn_master_proc.pid).children()
+ ) < num_workers_expected:
+ start_refresh(gunicorn_master_proc)
+ except (AirflowWebServerTimeout, OSError) as err:
+ log.error(err)
+ log.error("Shutting down webserver")
+ try:
+ gunicorn_master_proc.terminate()
+ gunicorn_master_proc.wait()
+ finally:
+ sys.exit(1)
+
def webserver(args):
print(settings.HEADER)
@@ -769,7 +787,8 @@ def webserver(args):
def monitor_gunicorn(gunicorn_master_proc):
# These run forever until SIG{INT, TERM, KILL, ...} signal is sent
if conf.getint('webserver', 'worker_refresh_interval') > 0:
- restart_workers(gunicorn_master_proc, num_workers)
+ master_timeout = conf.getint('webserver', 'web_server_master_timeout')
+ restart_workers(gunicorn_master_proc, num_workers, master_timeout)
else:
while True:
time.sleep(1)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7e762d42/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index acfd15e..8f82208 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -198,6 +198,9 @@ web_server_port = 8080
web_server_ssl_cert =
web_server_ssl_key =
+# Number of seconds the webserver waits before killing gunicorn master that doesn't respond
+web_server_master_timeout = 120
+
# Number of seconds the gunicorn webserver waits before timing out on a worker
web_server_worker_timeout = 120
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7e762d42/airflow/exceptions.py
----------------------------------------------------------------------
diff --git a/airflow/exceptions.py b/airflow/exceptions.py
index c1b728c..f4527b2 100644
--- a/airflow/exceptions.py
+++ b/airflow/exceptions.py
@@ -32,6 +32,10 @@ class AirflowTaskTimeout(AirflowException):
pass
+class AirflowWebServerTimeout(AirflowException):
+ pass
+
+
class AirflowSkipException(AirflowException):
pass
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7e762d42/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index ce5fb7a..2ac7b66 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -1475,6 +1475,16 @@ class CliTests(unittest.TestCase):
self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "airflow"]).wait())
self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait())
+ # Patch for causing webserver timeout
+ @mock.patch("airflow.bin.cli.get_num_workers_running", return_value=0)
+ def test_cli_webserver_shutdown_when_gunicorn_master_is_killed(self, _):
+ # Shorten timeout so that this test doesn't take too long time
+ configuration.conf.set("webserver", "web_server_master_timeout", "10")
+ args = self.parser.parse_args(['webserver'])
+ with self.assertRaises(SystemExit) as e:
+ cli.webserver(args)
+ self.assertEqual(e.exception.code, 1)
+
class SecurityTests(unittest.TestCase):
def setUp(self):