You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/11/19 16:56:54 UTC
[airflow] branch v1-10-test updated: Pass SQLAlchemy engine options
to FAB based UI (#11395)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v1-10-test by this push:
new 8cce998 Pass SQLAlchemy engine options to FAB based UI (#11395)
8cce998 is described below
commit 8cce998da80229bab345e863461a1881fc1e9999
Author: MichaĆ Misiewicz <mi...@gmail.com>
AuthorDate: Fri Oct 16 19:55:41 2020 +0200
Pass SQLAlchemy engine options to FAB based UI (#11395)
Co-authored-by: Tomek Urbaszek <tu...@gmail.com>
(cherry picked from commit 91484b938f0b6f943404f1aeb3e63b61b808cfe9)
---
airflow/settings.py | 57 ++++++++++++++++++++++++----------------------
airflow/www/app.py | 3 +++
airflow/www_rbac/app.py | 4 ++++
tests/www/test_app.py | 26 ++++++++++++++++++++-
tests/www_rbac/test_app.py | 26 ++++++++++++++++++++-
5 files changed, 87 insertions(+), 29 deletions(-)
diff --git a/airflow/settings.py b/airflow/settings.py
index e39c960..0f35dc5 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -36,7 +36,6 @@ from sqlalchemy.pool import NullPool
from airflow.configuration import conf, AIRFLOW_HOME, WEBSERVER_CONFIG # NOQA F401
from airflow.logging_config import configure_logging
-from airflow.utils.module_loading import import_string
from airflow.utils.sqlalchemy import setup_event_handlers
log = logging.getLogger(__name__)
@@ -233,12 +232,38 @@ def configure_orm(disable_connection_pool=False):
log.debug("Setting up DB connection pool (PID %s)" % os.getpid())
global engine
global Session
- engine_args = {}
+ engine_args = prepare_engine_args(disable_connection_pool)
+
+ # Allow the user to specify an encoding for their DB otherwise default
+ # to utf-8 so jobs & users with non-latin1 characters can still use us.
+ engine_args['encoding'] = conf.get('core', 'SQL_ENGINE_ENCODING', fallback='utf-8')
+
+ # For Python2 we get back a newstr and need a str
+ engine_args['encoding'] = engine_args['encoding'].__str__()
+
+ if conf.has_option('core', 'sql_alchemy_connect_args'):
+ connect_args = conf.getimport('core', 'sql_alchemy_connect_args')
+ else:
+ connect_args = {}
+
+ engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args)
+ setup_event_handlers(engine)
+
+ Session = scoped_session(sessionmaker(
+ autocommit=False,
+ autoflush=False,
+ bind=engine,
+ expire_on_commit=False,
+ ))
+
+def prepare_engine_args(disable_connection_pool=False):
+ """Prepare SQLAlchemy engine args"""
+ engine_args = {}
pool_connections = conf.getboolean('core', 'SQL_ALCHEMY_POOL_ENABLED')
if disable_connection_pool or not pool_connections:
engine_args['poolclass'] = NullPool
- log.debug("settings.configure_orm(): Using NullPool")
+ log.debug("settings.prepare_engine_args(): Using NullPool")
elif 'sqlite' not in SQL_ALCHEMY_CONN:
# Pool size engine args not supported by sqlite.
# If no config value is defined for the pool size, select a reasonable value.
@@ -270,35 +295,13 @@ def configure_orm(disable_connection_pool=False):
# https://docs.sqlalchemy.org/en/13/core/pooling.html#disconnect-handling-pessimistic
pool_pre_ping = conf.getboolean('core', 'SQL_ALCHEMY_POOL_PRE_PING', fallback=True)
- log.debug("settings.configure_orm(): Using pool settings. pool_size=%d, max_overflow=%d, "
+ log.debug("settings.prepare_engine_args(): Using pool settings. pool_size=%d, max_overflow=%d, "
"pool_recycle=%d, pid=%d", pool_size, max_overflow, pool_recycle, os.getpid())
engine_args['pool_size'] = pool_size
engine_args['pool_recycle'] = pool_recycle
engine_args['pool_pre_ping'] = pool_pre_ping
engine_args['max_overflow'] = max_overflow
-
- # Allow the user to specify an encoding for their DB otherwise default
- # to utf-8 so jobs & users with non-latin1 characters can still use
- # us.
- engine_args['encoding'] = conf.get('core', 'SQL_ENGINE_ENCODING', fallback='utf-8')
- # For Python2 we get back a newstr and need a str
- engine_args['encoding'] = engine_args['encoding'].__str__()
-
- if conf.has_option('core', 'sql_alchemy_connect_args'):
- connect_args = import_string(
- conf.get('core', 'sql_alchemy_connect_args')
- )
- else:
- connect_args = {}
-
- engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args)
- setup_event_handlers(engine)
-
- Session = scoped_session(
- sessionmaker(autocommit=False,
- autoflush=False,
- bind=engine,
- expire_on_commit=False))
+ return engine_args
def dispose_orm():
diff --git a/airflow/www/app.py b/airflow/www/app.py
index b101f45..425c64c 100644
--- a/airflow/www/app.py
+++ b/airflow/www/app.py
@@ -67,6 +67,9 @@ def create_app(config=None, testing=False):
if config:
app.config.from_mapping(config)
+ if 'SQLALCHEMY_ENGINE_OPTIONS' not in app.config:
+ app.config['SQLALCHEMY_ENGINE_OPTIONS'] = settings.prepare_engine_args()
+
csrf.init_app(app)
app.config['TESTING'] = testing
diff --git a/airflow/www_rbac/app.py b/airflow/www_rbac/app.py
index 46ad120..29a364b 100644
--- a/airflow/www_rbac/app.py
+++ b/airflow/www_rbac/app.py
@@ -46,6 +46,7 @@ csrf = CSRFProtect()
log = logging.getLogger(__name__)
+
def create_app(config=None, session=None, testing=False, app_name="Airflow"):
global app, appbuilder
app = Flask(__name__)
@@ -76,6 +77,9 @@ def create_app(config=None, session=None, testing=False, app_name="Airflow"):
if config:
app.config.from_mapping(config)
+ if 'SQLALCHEMY_ENGINE_OPTIONS' not in app.config:
+ app.config['SQLALCHEMY_ENGINE_OPTIONS'] = settings.prepare_engine_args()
+
csrf.init_app(app)
db = SQLA(app)
diff --git a/tests/www/test_app.py b/tests/www/test_app.py
index 64255aa..752c68a 100644
--- a/tests/www/test_app.py
+++ b/tests/www/test_app.py
@@ -16,12 +16,15 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
+import json
import unittest
+import pytest
+import six
from werkzeug.middleware.proxy_fix import ProxyFix
from airflow.www_rbac import app as application
+from tests.compat import mock
from tests.test_utils.config import conf_vars
@@ -55,3 +58,24 @@ class TestApp(unittest.TestCase):
self.assertEqual(app.wsgi_app.x_host, 5)
self.assertEqual(app.wsgi_app.x_port, 6)
self.assertEqual(app.wsgi_app.x_prefix, 7)
+
+ @conf_vars({
+ ('core', 'sql_alchemy_pool_enabled'): 'True',
+ ('core', 'sql_alchemy_pool_size'): '3',
+ ('core', 'sql_alchemy_max_overflow'): '5',
+ ('core', 'sql_alchemy_pool_recycle'): '120',
+ ('core', 'sql_alchemy_pool_pre_ping'): 'True',
+ })
+ @mock.patch("airflow.www.app.app", None)
+ @pytest.mark.backend("mysql", "postgres")
+ def test_should_set_sqlalchemy_engine_options(self):
+ app = application.cached_app(testing=True)
+ engine_params = {
+ 'pool_size': 3,
+ 'pool_recycle': 120,
+ 'pool_pre_ping': True,
+ 'max_overflow': 5
+ }
+ if six.PY2:
+ engine_params = json.dumps(engine_params)
+ self.assertEqual(app.config['SQLALCHEMY_ENGINE_OPTIONS'], engine_params)
diff --git a/tests/www_rbac/test_app.py b/tests/www_rbac/test_app.py
index 71d6255..e16e09b 100644
--- a/tests/www_rbac/test_app.py
+++ b/tests/www_rbac/test_app.py
@@ -16,13 +16,16 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
+import json
import unittest
+import pytest
+import six
from werkzeug.middleware.proxy_fix import ProxyFix
from airflow.settings import Session
from airflow.www_rbac import app as application
+from tests.compat import mock
from tests.test_utils.config import conf_vars
@@ -56,3 +59,24 @@ class TestApp(unittest.TestCase):
self.assertEqual(app.wsgi_app.x_host, 5)
self.assertEqual(app.wsgi_app.x_port, 6)
self.assertEqual(app.wsgi_app.x_prefix, 7)
+
+ @conf_vars({
+ ('core', 'sql_alchemy_pool_enabled'): 'True',
+ ('core', 'sql_alchemy_pool_size'): '3',
+ ('core', 'sql_alchemy_max_overflow'): '5',
+ ('core', 'sql_alchemy_pool_recycle'): '120',
+ ('core', 'sql_alchemy_pool_pre_ping'): 'True',
+ })
+ @mock.patch("airflow.www.app.app", None)
+ @pytest.mark.backend("mysql", "postgres")
+ def test_should_set_sqlalchemy_engine_options(self):
+ app = application.cached_app(testing=True)
+ engine_params = {
+ 'pool_size': 3,
+ 'pool_recycle': 120,
+ 'pool_pre_ping': True,
+ 'max_overflow': 5
+ }
+ if six.PY2:
+ engine_params = json.dumps(engine_params)
+ self.assertEqual(app.config['SQLALCHEMY_ENGINE_OPTIONS'], engine_params)