You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/06/08 04:57:05 UTC
[GitHub] [airflow] broholens opened a new issue, #24313: when i updated the dag, task in task_instance didn't update
broholens opened a new issue, #24313:
URL: https://github.com/apache/airflow/issues/24313
### Apache Airflow version
2.3.2 (latest released)
### What happened
There is a dag with pool1,and task in task_instance of this dag works on pool1,when i update the pool of the dag to pool2, task in task_instance of this dag still works on pool1.
I checked `serialized_dag` table of this dag, and the data already updated to pool2.
So i read the code, and i find there may have a bug in function `_add_dag_from_db` of `dagbay.py`. And it happens in a very low probability.
```python
def _add_dag_from_db(self, dag_id: str, session: Session):
"""Add DAG to DagBag from DB"""
from airflow.models.serialized_dag import SerializedDagModel
row = SerializedDagModel.get(dag_id, session)
if not row:
return None
row.load_op_links = self.load_op_links
dag = row.dag
for subdag in dag.subdags:
self.dags[subdag.dag_id] = subdag
self.dags[dag.dag_id] = dag
self.dags_last_fetched[dag.dag_id] = timezone.utcnow()
self.dags_hash[dag.dag_id] = row.dag_hash
```
Let's say there's a situation:
when `row = SerializedDagModel.get(dag_id, session)` executed and `self.dags_last_fetched[dag.dag_id] = timezone.utcnow()` not executed, other process update the table `serialized_dag` of this dag, `self.dags` will store the old dag, and self.dags_last_fetched will be bigger than `serialized_dag` update_time,then code below will never execute:
```python
if sd_last_updated_datetime > self.dags_last_fetched[dag_id]:
self._add_dag_from_db(dag_id=dag_id, session=session)
```
#### so the dag in memory will never be updated.
And i think this code will fix the bug:
```python
def _add_dag_from_db(self, dag_id: str, session: Session):
"""Add DAG to DagBag from DB"""
from airflow.models.serialized_dag import SerializedDagModel
last_fetched_time = timezone.utcnow()
row = SerializedDagModel.get(dag_id, session)
if not row:
return None
row.load_op_links = self.load_op_links
dag = row.dag
for subdag in dag.subdags:
self.dags[subdag.dag_id] = subdag
self.dags[dag.dag_id] = dag
self.dags_last_fetched[dag.dag_id] = last_fetched_time
self.dags_hash[dag.dag_id] = row.dag_hash
```
### What you think should happen instead
_No response_
### How to reproduce
_No response_
### Operating System
linux
### Versions of Apache Airflow Providers
_No response_
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
_No response_
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] boring-cyborg[bot] commented on issue #24313: when i updated the dag, task in task_instance didn't update
Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #24313:
URL: https://github.com/apache/airflow/issues/24313#issuecomment-1149461127
Thanks for opening your first issue here! Be sure to follow the issue template!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk commented on issue #24313: when i updated the dag, task in task_instance didn't update
Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #24313:
URL: https://github.com/apache/airflow/issues/24313#issuecomment-1149705364
Can you please make a PR with a fix and unit test reproducing the case? I think this will be much easier to discuss about it when we see the PR and can discuss on actual code. The worst case - we will close the PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org