You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ar...@apache.org on 2018/03/22 18:51:00 UTC

incubator-airflow git commit: [AIRFLOW-1235] Fix webserver's odd behaviour

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 8c42d03c4 -> 7e762d42d


[AIRFLOW-1235] Fix webserver's odd behaviour

In some cases, the gunicorn master shuts down
but the webserver monitor process doesn't.
This PR add timeout functionality to shutdown
all related processes in such cases.

Dear Airflow maintainers,

Please accept this PR. I understand that it will
not be reviewed until I have checked off all the
steps below!

### JIRA
- [x] My PR addresses the following [Airflow JIRA]
(https://issues.apache.org/jira/browse/AIRFLOW/)
issues and references them in the PR title. For
example, "[AIRFLOW-XXX] My Airflow PR"
    -
https://issues.apache.org/jira/browse/AIRFLOW-1235

### Description
- [x] Here are some details about my PR, including
screenshots of any UI changes:

In some cases, the gunicorn master shuts down
but the webserver monitor process doesn't.
This PR add timeout functionality to shutdown
all related processes in such cases.

### Tests
- [x] My PR adds the following unit tests __OR__
does not need testing for this extremely good
reason:

tests.core:CliTests.test_cli_webserver_shutdown_wh
en_gunicorn_master_is_killed

### Commits
- [x] My commits all reference JIRA issues in
their subject lines, and I have squashed multiple
commits if they address the same issue. In
addition, my commits follow the guidelines from
"[How to write a good git commit
message](http://chris.beams.io/posts/git-
commit/)":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not
"adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

Closes #2330 from sekikn/AIRFLOW-1235


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7e762d42
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7e762d42
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7e762d42

Branch: refs/heads/master
Commit: 7e762d42df50d84e4740e15c24594c50aaab53a2
Parents: 8c42d03
Author: Kengo Seki <se...@apache.org>
Authored: Thu Mar 22 11:50:27 2018 -0700
Committer: Arthur Wiedmer <aw...@netflix.com>
Committed: Thu Mar 22 11:50:27 2018 -0700

----------------------------------------------------------------------
 airflow/bin/cli.py                           | 129 +++++++++++++---------
 airflow/config_templates/default_airflow.cfg |   3 +
 airflow/exceptions.py                        |   4 +
 tests/core.py                                |  10 ++
 4 files changed, 91 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7e762d42/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 449d8ca..1801cc7 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -46,7 +46,7 @@ import airflow
 from airflow import api
 from airflow import jobs, settings
 from airflow import configuration as conf
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, AirflowWebServerTimeout
 from airflow.executors import GetDefaultExecutor
 from airflow.models import (DagModel, DagBag, TaskInstance,
                             DagPickle, DagRun, Variable, DagStat,
@@ -592,7 +592,12 @@ def get_num_ready_workers_running(gunicorn_master_proc):
     return len(ready_workers)
 
 
-def restart_workers(gunicorn_master_proc, num_workers_expected):
+def get_num_workers_running(gunicorn_master_proc):
+    workers = psutil.Process(gunicorn_master_proc.pid).children()
+    return len(workers)
+
+
+def restart_workers(gunicorn_master_proc, num_workers_expected, master_timeout):
     """
     Runs forever, monitoring the child processes of @gunicorn_master_proc and
     restarting workers occasionally.
@@ -618,17 +623,18 @@ def restart_workers(gunicorn_master_proc, num_workers_expected):
     gracefully and that the oldest worker is terminated.
     """
 
-    def wait_until_true(fn):
+    def wait_until_true(fn, timeout=0):
         """
         Sleeps until fn is true
         """
+        t = time.time()
         while not fn():
+            if 0 < timeout and timeout <= time.time() - t:
+                raise AirflowWebServerTimeout(
+                    "No response from gunicorn master within {0} seconds"
+                    .format(timeout))
             time.sleep(0.1)
 
-    def get_num_workers_running(gunicorn_master_proc):
-        workers = psutil.Process(gunicorn_master_proc.pid).children()
-        return len(workers)
-
     def start_refresh(gunicorn_master_proc):
         batch_size = conf.getint('webserver', 'worker_refresh_batch_size')
         log.debug('%s doing a refresh of %s workers', state, batch_size)
@@ -640,56 +646,68 @@ def restart_workers(gunicorn_master_proc, num_workers_expected):
             gunicorn_master_proc.send_signal(signal.SIGTTIN)
             excess += 1
             wait_until_true(lambda: num_workers_expected + excess ==
-                                    get_num_workers_running(gunicorn_master_proc))
-
-    wait_until_true(lambda: num_workers_expected ==
-                            get_num_workers_running(gunicorn_master_proc))
-
-    while True:
-        num_workers_running = get_num_workers_running(gunicorn_master_proc)
-        num_ready_workers_running = get_num_ready_workers_running(gunicorn_master_proc)
-
-        state = '[{0} / {1}]'.format(num_ready_workers_running, num_workers_running)
-
-        # Whenever some workers are not ready, wait until all workers are ready
-        if num_ready_workers_running < num_workers_running:
-            log.debug('%s some workers are starting up, waiting...', state)
-            sys.stdout.flush()
-            time.sleep(1)
-
-        # Kill a worker gracefully by asking gunicorn to reduce number of workers
-        elif num_workers_running > num_workers_expected:
-            excess = num_workers_running - num_workers_expected
-            log.debug('%s killing %s workers', state, excess)
-
-            for _ in range(excess):
-                gunicorn_master_proc.send_signal(signal.SIGTTOU)
-                excess -= 1
-                wait_until_true(lambda: num_workers_expected + excess ==
-                                        get_num_workers_running(gunicorn_master_proc))
-
-        # Start a new worker by asking gunicorn to increase number of workers
-        elif num_workers_running == num_workers_expected:
-            refresh_interval = conf.getint('webserver', 'worker_refresh_interval')
-            log.debug(
-                '%s sleeping for %ss starting doing a refresh...',
-                state, refresh_interval
-            )
-            time.sleep(refresh_interval)
-            start_refresh(gunicorn_master_proc)
+                            get_num_workers_running(gunicorn_master_proc),
+                            master_timeout)
 
-        else:
-            # num_ready_workers_running == num_workers_running < num_workers_expected
-            log.error((
-                "%s some workers seem to have died and gunicorn"
-                "did not restart them as expected"
-            ), state)
-            time.sleep(10)
-            if len(
-                psutil.Process(gunicorn_master_proc.pid).children()
-            ) < num_workers_expected:
+    try:
+        wait_until_true(lambda: num_workers_expected ==
+                        get_num_workers_running(gunicorn_master_proc),
+                        master_timeout)
+        while True:
+            num_workers_running = get_num_workers_running(gunicorn_master_proc)
+            num_ready_workers_running = \
+                get_num_ready_workers_running(gunicorn_master_proc)
+
+            state = '[{0} / {1}]'.format(num_ready_workers_running, num_workers_running)
+
+            # Whenever some workers are not ready, wait until all workers are ready
+            if num_ready_workers_running < num_workers_running:
+                log.debug('%s some workers are starting up, waiting...', state)
+                sys.stdout.flush()
+                time.sleep(1)
+
+            # Kill a worker gracefully by asking gunicorn to reduce number of workers
+            elif num_workers_running > num_workers_expected:
+                excess = num_workers_running - num_workers_expected
+                log.debug('%s killing %s workers', state, excess)
+
+                for _ in range(excess):
+                    gunicorn_master_proc.send_signal(signal.SIGTTOU)
+                    excess -= 1
+                    wait_until_true(lambda: num_workers_expected + excess ==
+                                    get_num_workers_running(gunicorn_master_proc),
+                                    master_timeout)
+
+            # Start a new worker by asking gunicorn to increase number of workers
+            elif num_workers_running == num_workers_expected:
+                refresh_interval = conf.getint('webserver', 'worker_refresh_interval')
+                log.debug(
+                    '%s sleeping for %ss starting doing a refresh...',
+                    state, refresh_interval
+                )
+                time.sleep(refresh_interval)
                 start_refresh(gunicorn_master_proc)
 
+            else:
+                # num_ready_workers_running == num_workers_running < num_workers_expected
+                log.error((
+                    "%s some workers seem to have died and gunicorn"
+                    "did not restart them as expected"
+                ), state)
+                time.sleep(10)
+                if len(
+                    psutil.Process(gunicorn_master_proc.pid).children()
+                ) < num_workers_expected:
+                    start_refresh(gunicorn_master_proc)
+    except (AirflowWebServerTimeout, OSError) as err:
+        log.error(err)
+        log.error("Shutting down webserver")
+        try:
+            gunicorn_master_proc.terminate()
+            gunicorn_master_proc.wait()
+        finally:
+            sys.exit(1)
+
 
 def webserver(args):
     print(settings.HEADER)
@@ -769,7 +787,8 @@ def webserver(args):
         def monitor_gunicorn(gunicorn_master_proc):
             # These run forever until SIG{INT, TERM, KILL, ...} signal is sent
             if conf.getint('webserver', 'worker_refresh_interval') > 0:
-                restart_workers(gunicorn_master_proc, num_workers)
+                master_timeout = conf.getint('webserver', 'web_server_master_timeout')
+                restart_workers(gunicorn_master_proc, num_workers, master_timeout)
             else:
                 while True:
                     time.sleep(1)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7e762d42/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index acfd15e..8f82208 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -198,6 +198,9 @@ web_server_port = 8080
 web_server_ssl_cert =
 web_server_ssl_key =
 
+# Number of seconds the webserver waits before killing gunicorn master that doesn't respond
+web_server_master_timeout = 120
+
 # Number of seconds the gunicorn webserver waits before timing out on a worker
 web_server_worker_timeout = 120
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7e762d42/airflow/exceptions.py
----------------------------------------------------------------------
diff --git a/airflow/exceptions.py b/airflow/exceptions.py
index c1b728c..f4527b2 100644
--- a/airflow/exceptions.py
+++ b/airflow/exceptions.py
@@ -32,6 +32,10 @@ class AirflowTaskTimeout(AirflowException):
     pass
 
 
+class AirflowWebServerTimeout(AirflowException):
+    pass
+
+
 class AirflowSkipException(AirflowException):
     pass
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7e762d42/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index ce5fb7a..2ac7b66 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -1475,6 +1475,16 @@ class CliTests(unittest.TestCase):
         self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "airflow"]).wait())
         self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait())
 
+    # Patch for causing webserver timeout
+    @mock.patch("airflow.bin.cli.get_num_workers_running", return_value=0)
+    def test_cli_webserver_shutdown_when_gunicorn_master_is_killed(self, _):
+        # Shorten timeout so that this test doesn't take too long time
+        configuration.conf.set("webserver", "web_server_master_timeout", "10")
+        args = self.parser.parse_args(['webserver'])
+        with self.assertRaises(SystemExit) as e:
+            cli.webserver(args)
+        self.assertEqual(e.exception.code, 1)
+
 
 class SecurityTests(unittest.TestCase):
     def setUp(self):