You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/12 13:26:08 UTC

[GitHub] [airflow] tirkarthi commented on issue #22947: closing connection chunks in DbApiHook.get_pandas_df

tirkarthi commented on issue #22947:
URL: https://github.com/apache/airflow/issues/22947#issuecomment-1096726667

   Below is a test case explaining the problem. The `get_pandas_df` method in case of `chunksize` being passed returns a generator instead of a dataframe that references to the connection closed. When we try to iterate through then the generator uses a closed connection giving error. Also the behavior of closing the connection depends on the datatype as below. 
   
   https://pandas.pydata.org/docs/reference/api/pandas.read_sql.html#pandas-read-sql
   
   > con: SQLAlchemy connectable, str, or sqlite3 connection
   >    Using SQLAlchemy makes it possible to use any DB supported by that library. If a DBAPI2 object, only sqlite3 is supported. The user is responsible for engine disposal and connection closure for the SQLAlchemy connectable; str connections are closed automatically. See [here](https://docs.sqlalchemy.org/en/13/core/connections.html).
   
   https://github.com/pandas-dev/pandas/blob/c90294db40cd48fca7fbec5fa419b46a4b4768a1/pandas/io/sql.py#L991
   
   ```python
   def test_get_pandas_df_chunksize(self):
       import sqlite3
   
       class UnitTestSqliteHook(SqliteHook):
           conn_name_attr = 'test_conn_id'
           log = mock.MagicMock()
   
           def setup_table(self):
               self.conn = sqlite3.connect(":memory:")
               cursor = self.conn.cursor()
               cursor.execute("create table users(id int, name text)")
               cursor.execute("insert into users(id, name) values(1, 'a')")
               cursor.close()
   
           def get_conn(self):
               self.setup_table()
               return self.conn
   
       self.db_hook = UnitTestSqliteHook()
   
       statement = 'select * from users'
       df = list(self.db_hook.get_pandas_df(statement, chunksize=1))
   
       assert df[0].columns[0] == 'id'
       assert df[0].values.tolist()[0][0] == 1
       assert df[0].values.tolist()[0][1] == 'a'
   ```
   
   ```
   pytest tests/providers/sqlite/hooks/test_sqlite.py -k test_get_pandas_df_chunksize
   ============================================================================ test session starts ============================================================================
   platform linux -- Python 3.10.4, pytest-6.2.5, py-1.11.0, pluggy-1.0.0 -- /usr/local/bin/python
   cachedir: .pytest_cache
   rootdir: /opt/airflow, configfile: pytest.ini
   plugins: flaky-3.7.0, forked-1.4.0, cov-3.0.0, anyio-3.5.0, requests-mock-1.9.3, instafail-0.4.2, timeouts-1.2.1, rerunfailures-9.1.1, xdist-2.5.0, httpx-0.20.0, asyncio-0.18.3
   asyncio: mode=strict
   setup timeout: 0.0s, execution timeout: 0.0s, teardown timeout: 0.0s
   collected 9 items / 8 deselected / 1 selected                                                                                                                               
   
   tests/providers/sqlite/hooks/test_sqlite.py::TestSqliteHook::test_get_pandas_df_chunksize FAILED                                                                      [100%]
   
   ================================================================================= FAILURES ==================================================================================
   ________________________________________________________________ TestSqliteHook.test_get_pandas_df_chunksize ________________________________________________________________
   
   self = <tests.providers.sqlite.hooks.test_sqlite.TestSqliteHook testMethod=test_get_pandas_df_chunksize>
   
       def test_get_pandas_df_chunksize(self):
           import sqlite3
       
           class UnitTestSqliteHook(SqliteHook):
               conn_name_attr = 'test_conn_id'
               log = mock.MagicMock()
       
               def setup_table(self):
                   self.conn = sqlite3.connect(":memory:")
                   cursor = self.conn.cursor()
                   cursor.execute("create table users(id int, name text)")
                   cursor.execute("insert into users(id, name) values(1, 'a')")
                   cursor.close()
       
               def get_conn(self):
                   self.setup_table()
                   return self.conn
       
           self.db_hook = UnitTestSqliteHook()
       
           statement = 'select * from users'
   >       df = list(self.db_hook.get_pandas_df(statement, chunksize=1))
   
   tests/providers/sqlite/hooks/test_sqlite.py:126: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
   
   cursor = <sqlite3.Cursor object at 0x7fede21680c0>, chunksize = 1, columns = ['id', 'name'], index_col = None, coerce_float = True, parse_dates = None, dtype = None
   
       @staticmethod
       def _query_iterator(
           cursor,
           chunksize: int,
           columns,
           index_col=None,
           coerce_float: bool = True,
           parse_dates=None,
           dtype: DtypeArg | None = None,
       ):
           """Return generator through chunked result set"""
           has_read_data = False
           while True:
   >           data = cursor.fetchmany(chunksize)
   E           sqlite3.ProgrammingError: Cannot operate on a closed database.
   
   /usr/local/lib/python3.10/site-packages/pandas/io/sql.py:2047: ProgrammingError
   --------------------------------------------------------------------------- Captured stdout setup ---------------------------------------------------------------------------
   ========================= AIRFLOW ==========================
   Home of the user: /root
   Airflow home /root/airflow
   Skipping initializing of the DB as it was initialized already.
   You can re-initialize the database by adding --with-db-init flag when running tests.
   ============================================================================= warnings summary ==============================================================================
   airflow/configuration.py:407
     /opt/airflow/airflow/configuration.py:407: FutureWarning: The 'dag_default_view' setting in [webserver] has the old default value of 'tree'. This value has been changed to 'grid' in the running config, but please update your config before Apache Airflow 3.0.
       warnings.warn(
   
   airflow/configuration.py:407
     /opt/airflow/airflow/configuration.py:407: FutureWarning: The 'log_filename_template' setting in [logging] has the old default value of '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'. This value has been changed to 'dag_id={{ ti.dag_id }}/run_id={{ ti.run_id }}/task_id={{ ti.task_id }}/{%% if ti.map_index >= 0 %%}map_index={{ ti.map_index }}/{%% endif %%}attempt={{ try_number }}.log' in the running config, but please update your config before Apache Airflow 3.0.
       warnings.warn(
   
   -- Docs: https://docs.pytest.org/en/stable/warnings.html
   ========================================================================== short test summary info ==========================================================================
   FAILED tests/providers/sqlite/hooks/test_sqlite.py::TestSqliteHook::test_get_pandas_df_chunksize - sqlite3.ProgrammingError: Cannot operate on a closed database.
   ================================================================ 1 failed, 8 deselected, 2 warnings in 0.90s ================================================================
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org