You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Daniel Imberman (Jira)" <ji...@apache.org> on 2020/03/29 15:32:00 UTC

[jira] [Commented] (AIRFLOW-603) db.py pessimistic_connection_handling not compatible with Oracle

    [ https://issues.apache.org/jira/browse/AIRFLOW-603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17070395#comment-17070395 ] 

Daniel Imberman commented on AIRFLOW-603:
-----------------------------------------

[~vbenoit] is this still an issue?

> db.py pessimistic_connection_handling not compatible with Oracle
> ----------------------------------------------------------------
>
>                 Key: AIRFLOW-603
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-603
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: database
>    Affects Versions: 1.7.1.3
>         Environment: Debian Jessie, Python 2.7
>            Reporter: Vincent BENOIT
>            Priority: Major
>              Labels: db, oracle
>
> There is a problem with airflow's implementation of sqlalchemy.
> Current **pessimistic_connection_handling()** code in /utils/db.py is not compatible with Oracle ("SELECT 1" is non functional in Oracle).
> According to sqlalchemy documentation,
> [http://docs.sqlalchemy.org/en/latest/core/pooling.html#disconnect-handling-pessimistic] , the current code in Airflow is the older approach.
> I updated my Airflow source code with the following, recommended  by sqlalchemy, and it seems fully functional.
> {code:title=db.py|borderStyle=solid}
> def pessimistic_connection_handling():
>     # @event.listens_for(Pool, "checkout")
>     # def ping_connection(dbapi_connection, connection_record, connection_proxy):
>         # '''
>         # Disconnect Handling - Pessimistic, taken from:
>         # http://docs.sqlalchemy.org/en/rel_0_9/core/pooling.html
>         # '''
>         # cursor = dbapi_connection.cursor()
>         # try:
>             # cursor.execute("SELECT 1")
>         # except:
>             # raise exc.DisconnectionError()
>         # cursor.close()
> 	from sqlalchemy import select
> 	@event.listens_for(settings.engine, "engine_connect")
> 	def ping_connection(connection, branch):
> 		if branch:
> 			# "branch" refers to a sub-connection of a connection,
> 			# we don't want to bother pinging on these.
> 			return
> 		# turn off "close with result".  This flag is only used with
> 		# "connectionless" execution, otherwise will be False in any case
> 		save_should_close_with_result = connection.should_close_with_result
> 		connection.should_close_with_result = False
> 		try:
> 			# run a SELECT 1.   use a core select() so that
> 			# the SELECT of a scalar value without a table is
> 			# appropriately formatted for the backend
> 			connection.scalar(select([1]))
> 		except exc.DBAPIError as err:
> 			# catch SQLAlchemy's DBAPIError, which is a wrapper
> 			# for the DBAPI's exception.  It includes a .connection_invalidated
> 			# attribute which specifies if this connection is a "disconnect"
> 			# condition, which is based on inspection of the original exception
> 			# by the dialect in use.
> 			if err.connection_invalidated:
> 				# run the same SELECT again - the connection will re-validate
> 				# itself and establish a new connection.  The disconnect detection
> 				# here also causes the whole connection pool to be invalidated
> 				# so that all stale connections are discarded.
> 				connection.scalar(select([1]))
> 			else:
> 				raise
> 		finally:
> 			# restore "close with result"
> 			connection.should_close_with_result = save_should_close_with_result
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)