You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/12 11:41:26 UTC

[GitHub] [airflow] bauerfranz opened a new issue, #22947: closing connection chunks in DbApiHook.get_pandas_df

bauerfranz opened a new issue, #22947:
URL: https://github.com/apache/airflow/issues/22947

   ### Apache Airflow version
   
   2.2.5 (latest released)
   
   ### What happened
   
   Hi all, 
   Please be patient with me, it's my first Bugreport in git at all :) 
   
   **Affected function:** DbApiHook.get_pandas_df
   
   **Short description**: If I use DbApiHook.get_pandas_df with parameter "chunksize" the connection is lost
   
   **Error description**
   I tried using the DbApiHook.get_pandas_df function instead of pandas.read_sql. Without the parameter "chunksize" both functions work the same. But as soon as I add the parameter chunksize to get_pandas_df, I lose the connection in the first iteration. This happens both when querying Oracle and Mysql (Mariadb) databases.
   
   During my research I found a comment on a closed issue that describes the same -> [#8468
   ](https://github.com/apache/airflow/issues/8468)
   
   
   My Airflow version: 2.2.5
   Code example:
   
   # not working
   src_hook = OracleHook(oracle_conn_id='oracle_source_conn_id')
   query = "select * from example_table" 
   for chunk in src_hook.get_pandas_df(query,chunksize=1000):
       print(chunk.head())
   
   # works
   for chunk in src_hook.get_pandas_df(query):
       print(chunk.head())
   
   # works
   for chunk in pandas.read_sql(query,src_hook.get_conn(),chunksize=1000):
       print(chunk.head())
   
   **Error Message:**
     File "/.../airflow/venv/lib/python3.9/site-packages/pandas/io/sql.py", line 2083, in _query_iterator
       data = cursor.fetchmany(chunksize)
   cx_Oracle.InterfaceError: not connected
   
   
   I think it's something to do with the "with closing" argument, because when I remove that argument, the chunksize argument was working.
   def get_pandas_df(self, sql, parameters=None, **kwargs):
           """
           Executes the sql and returns a pandas dataframe
           :param sql: the sql statement to be executed (str) or a list of
               sql statements to execute
           :param parameters: The parameters to render the SQL query with.
           :param kwargs: (optional) passed into pandas.io.sql.read_sql method
           """
           try:
               from pandas.io import sql as psql
           except ImportError:
               raise Exception("pandas library not installed, run: pip install 'apache-airflow[pandas]'.")
          # Not working
           **with closing(self.get_conn()) as conn:**
                   return psql.read_sql(sql, con=conn, params=parameters, **kwargs)
               _# would working
               # return psql.read_sql(sql, con=conn, params=parameters, **kwargs)_
   
   
   ### What you think should happen instead
   
   It should give me a chunk of DataFrame 
   
   ### How to reproduce
   
   # not working
   src_hook = OracleHook(oracle_conn_id='oracle_source_conn_id')
   query = "select * from example_table" 
   for chunk in src_hook.get_pandas_df(query,chunksize=2):
       print(chunk.head())
   
   # works
   for chunk in src_hook.get_pandas_df(query):
       print(chunk.head())
   
   # works
   for chunk in pandas.read_sql(query,src_hook.get_conn(),chunksize=2):
       print(chunk.head())
   
   
   ### Operating System
   
   MacOS Monetäre
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow                           2.2.5
   apache-airflow-providers-ftp             2.1.2
   apache-airflow-providers-http            2.1.2
   apache-airflow-providers-imap            2.2.3
   apache-airflow-providers-microsoft-mssql 2.1.3
   apache-airflow-providers-mongo           2.3.3
   apache-airflow-providers-mysql           2.2.3
   apache-airflow-providers-oracle          2.2.3
   apache-airflow-providers-salesforce      3.4.3
   apache-airflow-providers-sftp            2.5.2
   apache-airflow-providers-sqlite          2.1.3
   apache-airflow-providers-ssh             2.4.3
   
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] tirkarthi commented on issue #22947: closing connection chunks in DbApiHook.get_pandas_df

Posted by GitBox <gi...@apache.org>.
tirkarthi commented on issue #22947:
URL: https://github.com/apache/airflow/issues/22947#issuecomment-1096726667

   Below is a test case explaining the problem. The `get_pandas_df` method in case of `chunksize` being passed returns a generator instead of a dataframe that references to the connection closed. When we try to iterate through then the generator uses a closed connection giving error. Also the behavior of closing the connection depends on the datatype as below. 
   
   https://pandas.pydata.org/docs/reference/api/pandas.read_sql.html#pandas-read-sql
   
   > con: SQLAlchemy connectable, str, or sqlite3 connection
   >    Using SQLAlchemy makes it possible to use any DB supported by that library. If a DBAPI2 object, only sqlite3 is supported. The user is responsible for engine disposal and connection closure for the SQLAlchemy connectable; str connections are closed automatically. See [here](https://docs.sqlalchemy.org/en/13/core/connections.html).
   
   https://github.com/pandas-dev/pandas/blob/c90294db40cd48fca7fbec5fa419b46a4b4768a1/pandas/io/sql.py#L991
   
   ```python
   def test_get_pandas_df_chunksize(self):
       import sqlite3
   
       class UnitTestSqliteHook(SqliteHook):
           conn_name_attr = 'test_conn_id'
           log = mock.MagicMock()
   
           def setup_table(self):
               self.conn = sqlite3.connect(":memory:")
               cursor = self.conn.cursor()
               cursor.execute("create table users(id int, name text)")
               cursor.execute("insert into users(id, name) values(1, 'a')")
               cursor.close()
   
           def get_conn(self):
               self.setup_table()
               return self.conn
   
       self.db_hook = UnitTestSqliteHook()
   
       statement = 'select * from users'
       df = list(self.db_hook.get_pandas_df(statement, chunksize=1))
   
       assert df[0].columns[0] == 'id'
       assert df[0].values.tolist()[0][0] == 1
       assert df[0].values.tolist()[0][1] == 'a'
   ```
   
   ```
   pytest tests/providers/sqlite/hooks/test_sqlite.py -k test_get_pandas_df_chunksize
   ============================================================================ test session starts ============================================================================
   platform linux -- Python 3.10.4, pytest-6.2.5, py-1.11.0, pluggy-1.0.0 -- /usr/local/bin/python
   cachedir: .pytest_cache
   rootdir: /opt/airflow, configfile: pytest.ini
   plugins: flaky-3.7.0, forked-1.4.0, cov-3.0.0, anyio-3.5.0, requests-mock-1.9.3, instafail-0.4.2, timeouts-1.2.1, rerunfailures-9.1.1, xdist-2.5.0, httpx-0.20.0, asyncio-0.18.3
   asyncio: mode=strict
   setup timeout: 0.0s, execution timeout: 0.0s, teardown timeout: 0.0s
   collected 9 items / 8 deselected / 1 selected                                                                                                                               
   
   tests/providers/sqlite/hooks/test_sqlite.py::TestSqliteHook::test_get_pandas_df_chunksize FAILED                                                                      [100%]
   
   ================================================================================= FAILURES ==================================================================================
   ________________________________________________________________ TestSqliteHook.test_get_pandas_df_chunksize ________________________________________________________________
   
   self = <tests.providers.sqlite.hooks.test_sqlite.TestSqliteHook testMethod=test_get_pandas_df_chunksize>
   
       def test_get_pandas_df_chunksize(self):
           import sqlite3
       
           class UnitTestSqliteHook(SqliteHook):
               conn_name_attr = 'test_conn_id'
               log = mock.MagicMock()
       
               def setup_table(self):
                   self.conn = sqlite3.connect(":memory:")
                   cursor = self.conn.cursor()
                   cursor.execute("create table users(id int, name text)")
                   cursor.execute("insert into users(id, name) values(1, 'a')")
                   cursor.close()
       
               def get_conn(self):
                   self.setup_table()
                   return self.conn
       
           self.db_hook = UnitTestSqliteHook()
       
           statement = 'select * from users'
   >       df = list(self.db_hook.get_pandas_df(statement, chunksize=1))
   
   tests/providers/sqlite/hooks/test_sqlite.py:126: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
   
   cursor = <sqlite3.Cursor object at 0x7fede21680c0>, chunksize = 1, columns = ['id', 'name'], index_col = None, coerce_float = True, parse_dates = None, dtype = None
   
       @staticmethod
       def _query_iterator(
           cursor,
           chunksize: int,
           columns,
           index_col=None,
           coerce_float: bool = True,
           parse_dates=None,
           dtype: DtypeArg | None = None,
       ):
           """Return generator through chunked result set"""
           has_read_data = False
           while True:
   >           data = cursor.fetchmany(chunksize)
   E           sqlite3.ProgrammingError: Cannot operate on a closed database.
   
   /usr/local/lib/python3.10/site-packages/pandas/io/sql.py:2047: ProgrammingError
   --------------------------------------------------------------------------- Captured stdout setup ---------------------------------------------------------------------------
   ========================= AIRFLOW ==========================
   Home of the user: /root
   Airflow home /root/airflow
   Skipping initializing of the DB as it was initialized already.
   You can re-initialize the database by adding --with-db-init flag when running tests.
   ============================================================================= warnings summary ==============================================================================
   airflow/configuration.py:407
     /opt/airflow/airflow/configuration.py:407: FutureWarning: The 'dag_default_view' setting in [webserver] has the old default value of 'tree'. This value has been changed to 'grid' in the running config, but please update your config before Apache Airflow 3.0.
       warnings.warn(
   
   airflow/configuration.py:407
     /opt/airflow/airflow/configuration.py:407: FutureWarning: The 'log_filename_template' setting in [logging] has the old default value of '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'. This value has been changed to 'dag_id={{ ti.dag_id }}/run_id={{ ti.run_id }}/task_id={{ ti.task_id }}/{%% if ti.map_index >= 0 %%}map_index={{ ti.map_index }}/{%% endif %%}attempt={{ try_number }}.log' in the running config, but please update your config before Apache Airflow 3.0.
       warnings.warn(
   
   -- Docs: https://docs.pytest.org/en/stable/warnings.html
   ========================================================================== short test summary info ==========================================================================
   FAILED tests/providers/sqlite/hooks/test_sqlite.py::TestSqliteHook::test_get_pandas_df_chunksize - sqlite3.ProgrammingError: Cannot operate on a closed database.
   ================================================================ 1 failed, 8 deselected, 2 warnings in 0.90s ================================================================
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on issue #22947: closing connection chunks in DbApiHook.get_pandas_df

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #22947:
URL: https://github.com/apache/airflow/issues/22947#issuecomment-1096615636

   Thanks for opening your first issue here! Be sure to follow the issue template!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk closed issue #22947: closing connection chunks in DbApiHook.get_pandas_df

Posted by GitBox <gi...@apache.org>.
potiuk closed issue #22947: closing connection chunks in  DbApiHook.get_pandas_df
URL: https://github.com/apache/airflow/issues/22947


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on issue #22947: closing connection chunks in DbApiHook.get_pandas_df

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #22947:
URL: https://github.com/apache/airflow/issues/22947#issuecomment-1098577692

   Why don't you attempt to fix it ? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hubert-pietron commented on issue #22947: closing connection chunks in DbApiHook.get_pandas_df

Posted by GitBox <gi...@apache.org>.
hubert-pietron commented on issue #22947:
URL: https://github.com/apache/airflow/issues/22947#issuecomment-1103533615

   If @bauerfranz is not interested in fixing this bug i would want to work on this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on issue #22947: closing connection chunks in DbApiHook.get_pandas_df

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #22947:
URL: https://github.com/apache/airflow/issues/22947#issuecomment-1103538011

   assigned you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org