You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/04/04 06:32:57 UTC
incubator-airflow git commit: [AIRFLOW-1004][AIRFLOW-276] Fix
`airflow webserver -D` to run in background
Repository: incubator-airflow
Updated Branches:
refs/heads/master e4494f85e -> a9b20a04b
[AIRFLOW-1004][AIRFLOW-276] Fix `airflow webserver -D` to run in background
AIRFLOW-276 introduced a monitor process for
gunicorn
to find new files in the dag folder, but it also
changed
`airflow webserver -D`'s behavior to run in
foreground.
This PR fixes that by running the monitor as a
daemon process.
Closes #2208 from sekikn/AIRFLOW-1004
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a9b20a04
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a9b20a04
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a9b20a04
Branch: refs/heads/master
Commit: a9b20a04b052e9479dbb79fd46124293085610e9
Parents: e4494f8
Author: Kengo Seki <se...@apache.org>
Authored: Tue Apr 4 08:32:44 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Apr 4 08:32:44 2017 +0200
----------------------------------------------------------------------
airflow/bin/cli.py | 64 ++++++++++++++++++++++++++++++++++++++++---------
tests/core.py | 56 +++++++++++++++++++++++++++++++++++++++++++
2 files changed, 109 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9b20a04/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index e9c54e6..e4755c7 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -753,7 +753,12 @@ def webserver(args):
app.run(debug=True, port=args.port, host=args.hostname,
ssl_context=(ssl_cert, ssl_key) if ssl_cert and ssl_key else None)
else:
- pid, stdout, stderr, log_file = setup_locations("webserver", pid=args.pid)
+ pid, stdout, stderr, log_file = setup_locations("webserver", args.pid, args.stdout, args.stderr, args.log_file)
+ if args.daemon:
+ handle = setup_logging(log_file)
+ stdout = open(stdout, 'w+')
+ stderr = open(stderr, 'w+')
+
print(
textwrap.dedent('''\
Running the Gunicorn Server with:
@@ -771,7 +776,6 @@ def webserver(args):
'-t', str(worker_timeout),
'-b', args.hostname + ':' + str(args.port),
'-n', 'airflow-webserver',
- '-p', str(pid),
'-c', 'airflow.www.gunicorn_config'
]
@@ -782,28 +786,66 @@ def webserver(args):
run_args += ['--error-logfile', str(args.error_logfile)]
if args.daemon:
- run_args += ["-D"]
+ run_args += ['-D', '-p', str(pid)]
+
if ssl_cert:
run_args += ['--certfile', ssl_cert, '--keyfile', ssl_key]
run_args += ["airflow.www.app:cached_app()"]
- gunicorn_master_proc = subprocess.Popen(run_args)
+ gunicorn_master_proc = None
def kill_proc(dummy_signum, dummy_frame):
gunicorn_master_proc.terminate()
gunicorn_master_proc.wait()
sys.exit(0)
- signal.signal(signal.SIGINT, kill_proc)
- signal.signal(signal.SIGTERM, kill_proc)
+ def monitor_gunicorn(gunicorn_master_proc):
+ # These run forever until SIG{INT, TERM, KILL, ...} signal is sent
+ if conf.getint('webserver', 'worker_refresh_interval') > 0:
+ restart_workers(gunicorn_master_proc, num_workers)
+ else:
+ while True:
+ time.sleep(1)
- # These run forever until SIG{INT, TERM, KILL, ...} signal is sent
- if conf.getint('webserver', 'worker_refresh_interval') > 0:
- restart_workers(gunicorn_master_proc, num_workers)
+ if args.daemon:
+ base, ext = os.path.splitext(pid)
+ ctx = daemon.DaemonContext(
+ pidfile=TimeoutPIDLockFile(base + "-monitor" + ext, -1),
+ files_preserve=[handle],
+ stdout=stdout,
+ stderr=stderr,
+ signal_map={
+ signal.SIGINT: kill_proc,
+ signal.SIGTERM: kill_proc
+ },
+ )
+ with ctx:
+ subprocess.Popen(run_args)
+
+ # Reading pid file directly, since Popen#pid doesn't
+ # seem to return the right value with DaemonContext.
+ while True:
+ try:
+ with open(pid) as f:
+ gunicorn_master_proc_pid = int(f.read())
+ break
+ except IOError:
+ logging.debug("Waiting for gunicorn's pid file to be created.")
+ time.sleep(0.1)
+
+ gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid)
+ monitor_gunicorn(gunicorn_master_proc)
+
+ stdout.close()
+ stderr.close()
else:
- while True:
- time.sleep(1)
+ gunicorn_master_proc = subprocess.Popen(run_args)
+
+ signal.signal(signal.SIGINT, kill_proc)
+ signal.signal(signal.SIGTERM, kill_proc)
+
+ monitor_gunicorn(gunicorn_master_proc)
def scheduler(args):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9b20a04/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 997bb42..7da08e1 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -1427,6 +1427,62 @@ class CliTests(unittest.TestCase):
os.remove('variables1.json')
os.remove('variables2.json')
+ def test_cli_webserver_foreground(self):
+ import subprocess
+
+ # Confirm that webserver hasn't been launched.
+ # pgrep returns exit status 1 if no process matched.
+ self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "airflow"]).wait())
+ self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait())
+
+ # Run webserver in foreground and terminate it.
+ p = subprocess.Popen(["airflow", "webserver"])
+ p.terminate()
+ p.wait()
+
+ # Assert that no process remains.
+ self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "airflow"]).wait())
+ self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait())
+
+ @unittest.skipIf("TRAVIS" in os.environ and bool(os.environ["TRAVIS"]),
+ "Skipping test due to lack of required file permission")
+ def test_cli_webserver_background(self):
+ import subprocess
+ import psutil
+
+ def wait_pidfile(pidfile):
+ while True:
+ try:
+ with open(pidfile) as f:
+ return int(f.read())
+ except IOError:
+ sleep(1)
+
+ # Confirm that webserver hasn't been launched.
+ self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "airflow"]).wait())
+ self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait())
+
+ # Run webserver in background.
+ subprocess.Popen(["airflow", "webserver", "-D"])
+ pidfile = cli.setup_locations("webserver")[0]
+ wait_pidfile(pidfile)
+
+ # Assert that gunicorn and its monitor are launched.
+ self.assertEqual(0, subprocess.Popen(["pgrep", "-c", "airflow"]).wait())
+ self.assertEqual(0, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait())
+
+ # Terminate monitor process.
+ pidfile = cli.setup_locations("webserver-monitor")[0]
+ pid = wait_pidfile(pidfile)
+ p = psutil.Process(pid)
+ p.terminate()
+ p.wait()
+
+ # Assert that no process remains.
+ self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "airflow"]).wait())
+ self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait())
+
+
class SecurityTests(unittest.TestCase):
def setUp(self):
configuration.load_test_config()