You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/12/02 15:23:01 UTC

[3/5] incubator-airflow git commit: [AIRFLOW-1559] Dispose SQLAlchemy engines on exit

[AIRFLOW-1559] Dispose SQLAlchemy engines on exit

When a forked process or the entire interpreter terminates, we have
to close all pooled database connections. The database can run out
of connections otherwise. At a minimum, it will print errors in its
log file.

By using an atexit handler we ensure that connections are closed
for each interpreter and Gunicorn worker termination. Only usages
of multiprocessing.Process require special handling as those
terminate via os._exit() which does not run finalizers.

This commit is based on a contribution by @dhuang
https://github.com/apache/incubator-airflow/pull/2767


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6bf1a6ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6bf1a6ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6bf1a6ed

Branch: refs/heads/master
Commit: 6bf1a6edaf13d3e255c47488f2747a2b8ebeff6c
Parents: 5a303eb
Author: Stephan Erb <st...@blue-yonder.com>
Authored: Sat Nov 25 22:21:28 2017 +0100
Committer: Stephan Erb <st...@blue-yonder.com>
Committed: Wed Nov 29 09:49:57 2017 +0100

----------------------------------------------------------------------
 airflow/jobs.py     |  3 +++
 airflow/settings.py | 19 +++++++++++++++++++
 2 files changed, 22 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6bf1a6ed/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 868e785..91ab96c 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -384,6 +384,9 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin):
             finally:
                 sys.stdout = sys.__stdout__
                 sys.stderr = sys.__stderr__
+                # We re-initialized the ORM within this Process above so we need to
+                # tear it down manually here
+                settings.dispose_orm()
 
         p = multiprocessing.Process(target=helper,
                                     args=(),

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6bf1a6ed/airflow/settings.py
----------------------------------------------------------------------
diff --git a/airflow/settings.py b/airflow/settings.py
index 5559646..04d3548 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -17,6 +17,7 @@ from __future__ import division
 from __future__ import print_function
 from __future__ import unicode_literals
 
+import atexit
 import logging
 import os
 import pendulum
@@ -136,6 +137,7 @@ def configure_vars():
 
 
 def configure_orm(disable_connection_pool=False):
+    log.debug("Setting up DB connection pool (PID %s)" % os.getpid())
     global engine
     global Session
     engine_args = {}
@@ -154,6 +156,20 @@ def configure_orm(disable_connection_pool=False):
         sessionmaker(autocommit=False, autoflush=False, bind=engine))
 
 
+def dispose_orm():
+    """ Properly close pooled database connections """
+    log.debug("Disposing DB connection pool (PID %s)", os.getpid())
+    global engine
+    global Session
+
+    if Session:
+        Session.remove()
+        Session = None
+    if engine:
+        engine.dispose()
+        engine = None
+
+
 def configure_adapters():
     from pendulum import Pendulum
     try:
@@ -180,6 +196,9 @@ configure_vars()
 configure_adapters()
 configure_orm()
 
+# Ensure we close DB connections at scheduler and gunicon worker terminations
+atexit.register(dispose_orm)
+
 # Const stuff
 
 KILOBYTE = 1024