You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Aizhamal Nurmamat kyzy (JIRA)" <ji...@apache.org> on 2019/05/17 20:03:04 UTC
[jira] [Updated] (AIRFLOW-603) db.py
pessimistic_connection_handling not compatible with Oracle
[ https://issues.apache.org/jira/browse/AIRFLOW-603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Aizhamal Nurmamat kyzy updated AIRFLOW-603:
-------------------------------------------
Component/s: (was: db)
database
> db.py pessimistic_connection_handling not compatible with Oracle
> ----------------------------------------------------------------
>
> Key: AIRFLOW-603
> URL: https://issues.apache.org/jira/browse/AIRFLOW-603
> Project: Apache Airflow
> Issue Type: Bug
> Components: database
> Affects Versions: 1.7.1.3
> Environment: Debian Jessie, Python 2.7
> Reporter: Vincent BENOIT
> Priority: Major
> Labels: db, oracle
>
> There is a problem with airflow's implementation of sqlalchemy.
> Current **pessimistic_connection_handling()** code in /utils/db.py is not compatible with Oracle ("SELECT 1" is non functional in Oracle).
> According to sqlalchemy documentation,
> [http://docs.sqlalchemy.org/en/latest/core/pooling.html#disconnect-handling-pessimistic] , the current code in Airflow is the older approach.
> I updated my Airflow source code with the following, recommended by sqlalchemy, and it seems fully functional.
> {code:title=db.py|borderStyle=solid}
> def pessimistic_connection_handling():
> # @event.listens_for(Pool, "checkout")
> # def ping_connection(dbapi_connection, connection_record, connection_proxy):
> # '''
> # Disconnect Handling - Pessimistic, taken from:
> # http://docs.sqlalchemy.org/en/rel_0_9/core/pooling.html
> # '''
> # cursor = dbapi_connection.cursor()
> # try:
> # cursor.execute("SELECT 1")
> # except:
> # raise exc.DisconnectionError()
> # cursor.close()
> from sqlalchemy import select
> @event.listens_for(settings.engine, "engine_connect")
> def ping_connection(connection, branch):
> if branch:
> # "branch" refers to a sub-connection of a connection,
> # we don't want to bother pinging on these.
> return
> # turn off "close with result". This flag is only used with
> # "connectionless" execution, otherwise will be False in any case
> save_should_close_with_result = connection.should_close_with_result
> connection.should_close_with_result = False
> try:
> # run a SELECT 1. use a core select() so that
> # the SELECT of a scalar value without a table is
> # appropriately formatted for the backend
> connection.scalar(select([1]))
> except exc.DBAPIError as err:
> # catch SQLAlchemy's DBAPIError, which is a wrapper
> # for the DBAPI's exception. It includes a .connection_invalidated
> # attribute which specifies if this connection is a "disconnect"
> # condition, which is based on inspection of the original exception
> # by the dialect in use.
> if err.connection_invalidated:
> # run the same SELECT again - the connection will re-validate
> # itself and establish a new connection. The disconnect detection
> # here also causes the whole connection pool to be invalidated
> # so that all stale connections are discarded.
> connection.scalar(select([1]))
> else:
> raise
> finally:
> # restore "close with result"
> connection.should_close_with_result = save_should_close_with_result
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)