You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Vincent BENOIT (JIRA)" <ji...@apache.org> on 2016/10/28 09:16:58 UTC

[jira] [Updated] (AIRFLOW-603) db.py pessimistic_connection_handling not compatible with Oracle

     [ https://issues.apache.org/jira/browse/AIRFLOW-603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Vincent BENOIT updated AIRFLOW-603:
-----------------------------------
    Description: 
There is a problem with airflow's implementation of sqlalchemy.

Current **pessimistic_connection_handling()** code in /utils/db.py is not compatible with Oracle ("SELECT 1" is non functional in Oracle).

According to sqlalchemy documentation,
[http://docs.sqlalchemy.org/en/latest/core/pooling.html#disconnect-handling-pessimistic] , the current code in Airflow is the older approach.

I updated my Airflow source code with the following, recommended  by sqlalchemy, and it seems fully functional.

{code:title=db.py|borderStyle=solid}
def pessimistic_connection_handling():
    # @event.listens_for(Pool, "checkout")
    # def ping_connection(dbapi_connection, connection_record, connection_proxy):
        # '''
        # Disconnect Handling - Pessimistic, taken from:
        # http://docs.sqlalchemy.org/en/rel_0_9/core/pooling.html
        # '''
        # cursor = dbapi_connection.cursor()
        # try:
            # cursor.execute("SELECT 1")
        # except:
            # raise exc.DisconnectionError()
        # cursor.close()
	from sqlalchemy import select
	@event.listens_for(settings.engine, "engine_connect")
	def ping_connection(connection, branch):
		if branch:
			# "branch" refers to a sub-connection of a connection,
			# we don't want to bother pinging on these.
			return

		# turn off "close with result".  This flag is only used with
		# "connectionless" execution, otherwise will be False in any case
		save_should_close_with_result = connection.should_close_with_result
		connection.should_close_with_result = False

		try:
			# run a SELECT 1.   use a core select() so that
			# the SELECT of a scalar value without a table is
			# appropriately formatted for the backend
			connection.scalar(select([1]))
		except exc.DBAPIError as err:
			# catch SQLAlchemy's DBAPIError, which is a wrapper
			# for the DBAPI's exception.  It includes a .connection_invalidated
			# attribute which specifies if this connection is a "disconnect"
			# condition, which is based on inspection of the original exception
			# by the dialect in use.
			if err.connection_invalidated:
				# run the same SELECT again - the connection will re-validate
				# itself and establish a new connection.  The disconnect detection
				# here also causes the whole connection pool to be invalidated
				# so that all stale connections are discarded.
				connection.scalar(select([1]))
			else:
				raise
		finally:
			# restore "close with result"
			connection.should_close_with_result = save_should_close_with_result
{code}

  was:
There is a problem with airflow's implementation of sqlalchemy.

Current **pessimistic_connection_handling()** code in /utils/db.py is not compatible with Oracle ("SELECT 1" is non functional in Oracle).

According to sqlalchemy documentation,
[http://docs.sqlalchemy.org/en/latest/core/pooling.html#disconnect-handling-pessimistic] , the current code in Airflow is the older approach.

I updated my Airflow source code with the following, recommended  by sqlalechemy, and it seems fully functional.

{code:title=db.py|borderStyle=solid}
def pessimistic_connection_handling():
    # @event.listens_for(Pool, "checkout")
    # def ping_connection(dbapi_connection, connection_record, connection_proxy):
        # '''
        # Disconnect Handling - Pessimistic, taken from:
        # http://docs.sqlalchemy.org/en/rel_0_9/core/pooling.html
        # '''
        # cursor = dbapi_connection.cursor()
        # try:
            # cursor.execute("SELECT 1")
        # except:
            # raise exc.DisconnectionError()
        # cursor.close()
	from sqlalchemy import select
	@event.listens_for(settings.engine, "engine_connect")
	def ping_connection(connection, branch):
		if branch:
			# "branch" refers to a sub-connection of a connection,
			# we don't want to bother pinging on these.
			return

		# turn off "close with result".  This flag is only used with
		# "connectionless" execution, otherwise will be False in any case
		save_should_close_with_result = connection.should_close_with_result
		connection.should_close_with_result = False

		try:
			# run a SELECT 1.   use a core select() so that
			# the SELECT of a scalar value without a table is
			# appropriately formatted for the backend
			connection.scalar(select([1]))
		except exc.DBAPIError as err:
			# catch SQLAlchemy's DBAPIError, which is a wrapper
			# for the DBAPI's exception.  It includes a .connection_invalidated
			# attribute which specifies if this connection is a "disconnect"
			# condition, which is based on inspection of the original exception
			# by the dialect in use.
			if err.connection_invalidated:
				# run the same SELECT again - the connection will re-validate
				# itself and establish a new connection.  The disconnect detection
				# here also causes the whole connection pool to be invalidated
				# so that all stale connections are discarded.
				connection.scalar(select([1]))
			else:
				raise
		finally:
			# restore "close with result"
			connection.should_close_with_result = save_should_close_with_result
{code}


> db.py pessimistic_connection_handling not compatible with Oracle
> ----------------------------------------------------------------
>
>                 Key: AIRFLOW-603
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-603
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: db
>    Affects Versions: Airflow 1.7.1.3
>         Environment: Debian Jessie, Python 2.7
>            Reporter: Vincent BENOIT
>              Labels: db, oracle
>
> There is a problem with airflow's implementation of sqlalchemy.
> Current **pessimistic_connection_handling()** code in /utils/db.py is not compatible with Oracle ("SELECT 1" is non functional in Oracle).
> According to sqlalchemy documentation,
> [http://docs.sqlalchemy.org/en/latest/core/pooling.html#disconnect-handling-pessimistic] , the current code in Airflow is the older approach.
> I updated my Airflow source code with the following, recommended  by sqlalchemy, and it seems fully functional.
> {code:title=db.py|borderStyle=solid}
> def pessimistic_connection_handling():
>     # @event.listens_for(Pool, "checkout")
>     # def ping_connection(dbapi_connection, connection_record, connection_proxy):
>         # '''
>         # Disconnect Handling - Pessimistic, taken from:
>         # http://docs.sqlalchemy.org/en/rel_0_9/core/pooling.html
>         # '''
>         # cursor = dbapi_connection.cursor()
>         # try:
>             # cursor.execute("SELECT 1")
>         # except:
>             # raise exc.DisconnectionError()
>         # cursor.close()
> 	from sqlalchemy import select
> 	@event.listens_for(settings.engine, "engine_connect")
> 	def ping_connection(connection, branch):
> 		if branch:
> 			# "branch" refers to a sub-connection of a connection,
> 			# we don't want to bother pinging on these.
> 			return
> 		# turn off "close with result".  This flag is only used with
> 		# "connectionless" execution, otherwise will be False in any case
> 		save_should_close_with_result = connection.should_close_with_result
> 		connection.should_close_with_result = False
> 		try:
> 			# run a SELECT 1.   use a core select() so that
> 			# the SELECT of a scalar value without a table is
> 			# appropriately formatted for the backend
> 			connection.scalar(select([1]))
> 		except exc.DBAPIError as err:
> 			# catch SQLAlchemy's DBAPIError, which is a wrapper
> 			# for the DBAPI's exception.  It includes a .connection_invalidated
> 			# attribute which specifies if this connection is a "disconnect"
> 			# condition, which is based on inspection of the original exception
> 			# by the dialect in use.
> 			if err.connection_invalidated:
> 				# run the same SELECT again - the connection will re-validate
> 				# itself and establish a new connection.  The disconnect detection
> 				# here also causes the whole connection pool to be invalidated
> 				# so that all stale connections are discarded.
> 				connection.scalar(select([1]))
> 			else:
> 				raise
> 		finally:
> 			# restore "close with result"
> 			connection.should_close_with_result = save_should_close_with_result
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)