You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/12/19 21:48:46 UTC
[GitHub] [airflow] michaelmicheal opened a new issue, #28480: `purge_inactive_dag_warnings` Unhandled Exception in DagFileProcessorManager
michaelmicheal opened a new issue, #28480:
URL: https://github.com/apache/airflow/issues/28480
### Apache Airflow version
2.5.0
### What happened
Airflow Version: 2.4.3
In one of our instances, after upgrading to Airflow 2.4.3, we started seeing the [`purge_inactive_dag_warnings`](https://github.com/apache/airflow/blob/main/airflow/dag_processing/manager.py#L598) query failing a subset of parsing loops, but consistently happening within 500 parsing loops.
```python
self._deactivate_stale_dags()
DagWarning.purge_inactive_dag_warnings()
refreshed_dag_dir = self._refresh_dag_dir()
```
This caused the DagFileProcessorManager to exit consistently before processing all DAG files.
```python
Traceback (most recent call last):
File "/usr/local/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/local/lib/python3.9/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.9/site-packages/airflow/dag_processing/manager.py", line 257, in _run_processor_manager
processor_manager.start()
File "/usr/local/lib/python3.9/site-packages/airflow/dag_processing/manager.py", line 489, in start
return self._run_parsing_loop()
File "/usr/local/lib/python3.9/site-packages/airflow/dag_processing/manager.py", line 609, in _run_parsing_loop
DagWarning.purge_inactive_dag_warnings()
File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", line 75, in wrapper
return func(*args, session=session, **kwargs)
File "/usr/local/lib/python3.9/site-packages/airflow/models/dagwarning.py", line 82, in purge_inactive_dag_warnings
query.delete(synchronize_session=False)
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/query.py", line 3191, in delete
result = self.session.execute(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 1689, in execute
result = conn._execute_20(statement, params or {}, execution_options)
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1614, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", line 325, in _execute_on_connection
return connection._execute_clauseelement(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1481, in _execute_clauseelement
ret = self._execute_context(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1845, in _execute_context
self._handle_dbapi_exception(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2026, in _handle_dbapi_exception
util.raise_(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
raise exception
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
self.dialect.do_execute(
File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.9/site-packages/MySQLdb/cursors.py", line 206, in execute
res = self._query(query)
File "/usr/local/lib/python3.9/site-packages/MySQLdb/cursors.py", line 319, in _query
db.query(q)
File "/usr/local/lib/python3.9/site-packages/MySQLdb/connections.py", line 254, in query
_mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb.OperationalError) (2013, 'Lost connection to MySQL server during query')
[SQL: DELETE FROM dag_warning USING dag_warning, dag WHERE dag_warning.dag_id = dag.dag_id AND dag.is_active = false]
```
### What you think should happen instead
To resolve this, we wrapped the query in a try-except block to avoid the unhandled exception. I think we should
1. Wrap the `purge_inactive_dag_warnings` in a try-except block.
2. Consider not calling the `purge_inactive_dag_warnings` method every parsing loop. Maybe we could do it when we've parsed all the DAG files.
### How to reproduce
Hard to reproduce, but will happen if the `purge_inactive_dag_warnings` query throws an exception
### Operating System
Debian GNU/Linux 10 (buster)
### Versions of Apache Airflow Providers
_No response_
### Deployment
Other 3rd-party Helm chart
### Deployment details
Other 3rd-party Helm chart
### Anything else
Airflow 2.4.3 on Kubernetes
MySQL Version: 8.0.18
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] michaelmicheal closed issue #28480: `purge_inactive_dag_warnings` Unhandled Exception in DagFileProcessorManager
Posted by GitBox <gi...@apache.org>.
michaelmicheal closed issue #28480: `purge_inactive_dag_warnings` Unhandled Exception in DagFileProcessorManager
URL: https://github.com/apache/airflow/issues/28480
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org