You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Aizhamal Nurmamat kyzy (JIRA)" <ji...@apache.org> on 2019/05/17 20:03:04 UTC

[jira] [Updated] (AIRFLOW-603) db.py pessimistic_connection_handling not compatible with Oracle

     [ https://issues.apache.org/jira/browse/AIRFLOW-603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Aizhamal Nurmamat kyzy updated AIRFLOW-603:
-------------------------------------------
    Component/s:     (was: db)
                 database

> db.py pessimistic_connection_handling not compatible with Oracle
> ----------------------------------------------------------------
>
>                 Key: AIRFLOW-603
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-603
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: database
>    Affects Versions: 1.7.1.3
>         Environment: Debian Jessie, Python 2.7
>            Reporter: Vincent BENOIT
>            Priority: Major
>              Labels: db, oracle
>
> There is a problem with airflow's implementation of sqlalchemy.
> Current **pessimistic_connection_handling()** code in /utils/db.py is not compatible with Oracle ("SELECT 1" is non functional in Oracle).
> According to sqlalchemy documentation,
> [http://docs.sqlalchemy.org/en/latest/core/pooling.html#disconnect-handling-pessimistic] , the current code in Airflow is the older approach.
> I updated my Airflow source code with the following, recommended  by sqlalchemy, and it seems fully functional.
> {code:title=db.py|borderStyle=solid}
> def pessimistic_connection_handling():
>     # @event.listens_for(Pool, "checkout")
>     # def ping_connection(dbapi_connection, connection_record, connection_proxy):
>         # '''
>         # Disconnect Handling - Pessimistic, taken from:
>         # http://docs.sqlalchemy.org/en/rel_0_9/core/pooling.html
>         # '''
>         # cursor = dbapi_connection.cursor()
>         # try:
>             # cursor.execute("SELECT 1")
>         # except:
>             # raise exc.DisconnectionError()
>         # cursor.close()
> 	from sqlalchemy import select
> 	@event.listens_for(settings.engine, "engine_connect")
> 	def ping_connection(connection, branch):
> 		if branch:
> 			# "branch" refers to a sub-connection of a connection,
> 			# we don't want to bother pinging on these.
> 			return
> 		# turn off "close with result".  This flag is only used with
> 		# "connectionless" execution, otherwise will be False in any case
> 		save_should_close_with_result = connection.should_close_with_result
> 		connection.should_close_with_result = False
> 		try:
> 			# run a SELECT 1.   use a core select() so that
> 			# the SELECT of a scalar value without a table is
> 			# appropriately formatted for the backend
> 			connection.scalar(select([1]))
> 		except exc.DBAPIError as err:
> 			# catch SQLAlchemy's DBAPIError, which is a wrapper
> 			# for the DBAPI's exception.  It includes a .connection_invalidated
> 			# attribute which specifies if this connection is a "disconnect"
> 			# condition, which is based on inspection of the original exception
> 			# by the dialect in use.
> 			if err.connection_invalidated:
> 				# run the same SELECT again - the connection will re-validate
> 				# itself and establish a new connection.  The disconnect detection
> 				# here also causes the whole connection pool to be invalidated
> 				# so that all stale connections are discarded.
> 				connection.scalar(select([1]))
> 			else:
> 				raise
> 		finally:
> 			# restore "close with result"
> 			connection.should_close_with_result = save_should_close_with_result
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)