You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/04/04 06:32:57 UTC

incubator-airflow git commit: [AIRFLOW-1004][AIRFLOW-276] Fix `airflow webserver -D` to run in background

Repository: incubator-airflow
Updated Branches:
  refs/heads/master e4494f85e -> a9b20a04b


[AIRFLOW-1004][AIRFLOW-276] Fix `airflow webserver -D` to run in background

AIRFLOW-276 introduced a monitor process for
gunicorn
to find new files in the dag folder, but it also
changed
`airflow webserver -D`'s behavior to run in
foreground.
This PR fixes that by running the monitor as a
daemon process.

Closes #2208 from sekikn/AIRFLOW-1004


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a9b20a04
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a9b20a04
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a9b20a04

Branch: refs/heads/master
Commit: a9b20a04b052e9479dbb79fd46124293085610e9
Parents: e4494f8
Author: Kengo Seki <se...@apache.org>
Authored: Tue Apr 4 08:32:44 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Apr 4 08:32:44 2017 +0200

----------------------------------------------------------------------
 airflow/bin/cli.py | 64 ++++++++++++++++++++++++++++++++++++++++---------
 tests/core.py      | 56 +++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 109 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9b20a04/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index e9c54e6..e4755c7 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -753,7 +753,12 @@ def webserver(args):
         app.run(debug=True, port=args.port, host=args.hostname,
                 ssl_context=(ssl_cert, ssl_key) if ssl_cert and ssl_key else None)
     else:
-        pid, stdout, stderr, log_file = setup_locations("webserver", pid=args.pid)
+        pid, stdout, stderr, log_file = setup_locations("webserver", args.pid, args.stdout, args.stderr, args.log_file)
+        if args.daemon:
+            handle = setup_logging(log_file)
+            stdout = open(stdout, 'w+')
+            stderr = open(stderr, 'w+')
+
         print(
             textwrap.dedent('''\
                 Running the Gunicorn Server with:
@@ -771,7 +776,6 @@ def webserver(args):
             '-t', str(worker_timeout),
             '-b', args.hostname + ':' + str(args.port),
             '-n', 'airflow-webserver',
-            '-p', str(pid),
             '-c', 'airflow.www.gunicorn_config'
         ]
 
@@ -782,28 +786,66 @@ def webserver(args):
             run_args += ['--error-logfile', str(args.error_logfile)]
 
         if args.daemon:
-            run_args += ["-D"]
+            run_args += ['-D', '-p', str(pid)]
+
         if ssl_cert:
             run_args += ['--certfile', ssl_cert, '--keyfile', ssl_key]
 
         run_args += ["airflow.www.app:cached_app()"]
 
-        gunicorn_master_proc = subprocess.Popen(run_args)
+        gunicorn_master_proc = None
 
         def kill_proc(dummy_signum, dummy_frame):
             gunicorn_master_proc.terminate()
             gunicorn_master_proc.wait()
             sys.exit(0)
 
-        signal.signal(signal.SIGINT, kill_proc)
-        signal.signal(signal.SIGTERM, kill_proc)
+        def monitor_gunicorn(gunicorn_master_proc):
+            # These run forever until SIG{INT, TERM, KILL, ...} signal is sent
+            if conf.getint('webserver', 'worker_refresh_interval') > 0:
+                restart_workers(gunicorn_master_proc, num_workers)
+            else:
+                while True:
+                    time.sleep(1)
 
-        # These run forever until SIG{INT, TERM, KILL, ...} signal is sent
-        if conf.getint('webserver', 'worker_refresh_interval') > 0:
-            restart_workers(gunicorn_master_proc, num_workers)
+        if args.daemon:
+            base, ext = os.path.splitext(pid)
+            ctx = daemon.DaemonContext(
+                pidfile=TimeoutPIDLockFile(base + "-monitor" + ext, -1),
+                files_preserve=[handle],
+                stdout=stdout,
+                stderr=stderr,
+                signal_map={
+                    signal.SIGINT: kill_proc,
+                    signal.SIGTERM: kill_proc
+                },
+            )
+            with ctx:
+                subprocess.Popen(run_args)
+
+                # Reading pid file directly, since Popen#pid doesn't
+                # seem to return the right value with DaemonContext.
+                while True:
+                    try:
+                        with open(pid) as f:
+                            gunicorn_master_proc_pid = int(f.read())
+                            break
+                    except IOError:
+                        logging.debug("Waiting for gunicorn's pid file to be created.")
+                        time.sleep(0.1)
+
+                gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid)
+                monitor_gunicorn(gunicorn_master_proc)
+
+            stdout.close()
+            stderr.close()
         else:
-            while True:
-                time.sleep(1)
+            gunicorn_master_proc = subprocess.Popen(run_args)
+
+            signal.signal(signal.SIGINT, kill_proc)
+            signal.signal(signal.SIGTERM, kill_proc)
+
+            monitor_gunicorn(gunicorn_master_proc)
 
 
 def scheduler(args):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9b20a04/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 997bb42..7da08e1 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -1427,6 +1427,62 @@ class CliTests(unittest.TestCase):
         os.remove('variables1.json')
         os.remove('variables2.json')
 
+    def test_cli_webserver_foreground(self):
+        import subprocess
+
+        # Confirm that webserver hasn't been launched.
+        # pgrep returns exit status 1 if no process matched.
+        self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "airflow"]).wait())
+        self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait())
+
+        # Run webserver in foreground and terminate it.
+        p = subprocess.Popen(["airflow", "webserver"])
+        p.terminate()
+        p.wait()
+
+        # Assert that no process remains.
+        self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "airflow"]).wait())
+        self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait())
+
+    @unittest.skipIf("TRAVIS" in os.environ and bool(os.environ["TRAVIS"]),
+                     "Skipping test due to lack of required file permission")
+    def test_cli_webserver_background(self):
+        import subprocess
+        import psutil
+
+        def wait_pidfile(pidfile):
+            while True:
+                try:
+                    with open(pidfile) as f:
+                        return int(f.read())
+                except IOError:
+                    sleep(1)
+
+        # Confirm that webserver hasn't been launched.
+        self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "airflow"]).wait())
+        self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait())
+
+        # Run webserver in background.
+        subprocess.Popen(["airflow", "webserver", "-D"])
+        pidfile = cli.setup_locations("webserver")[0]
+        wait_pidfile(pidfile)
+
+        # Assert that gunicorn and its monitor are launched.
+        self.assertEqual(0, subprocess.Popen(["pgrep", "-c", "airflow"]).wait())
+        self.assertEqual(0, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait())
+
+        # Terminate monitor process.
+        pidfile = cli.setup_locations("webserver-monitor")[0]
+        pid = wait_pidfile(pidfile)
+        p = psutil.Process(pid)
+        p.terminate()
+        p.wait()
+
+        # Assert that no process remains.
+        self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "airflow"]).wait())
+        self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait())
+
+
 class SecurityTests(unittest.TestCase):
     def setUp(self):
         configuration.load_test_config()