You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/06/08 04:57:05 UTC

[GitHub] [airflow] broholens opened a new issue, #24313: when i updated the dag, task in task_instance didn't update

broholens opened a new issue, #24313:
URL: https://github.com/apache/airflow/issues/24313

   ### Apache Airflow version
   
   2.3.2 (latest released)
   
   ### What happened
   
   There is a dag with pool1,and task in task_instance of this dag works on pool1,when i update the pool of the dag to pool2,  task in task_instance of this dag still works on pool1.
   
   I checked `serialized_dag` table of this dag, and the data already updated to pool2. 
   
   So i read the code, and i find there may have a bug in function `_add_dag_from_db` of `dagbay.py`.  And it happens in a very low probability.
   ```python
       def _add_dag_from_db(self, dag_id: str, session: Session):
           """Add DAG to DagBag from DB"""
           from airflow.models.serialized_dag import SerializedDagModel
   
           row = SerializedDagModel.get(dag_id, session)
           if not row:
               return None
   
           row.load_op_links = self.load_op_links
           dag = row.dag
           for subdag in dag.subdags:
               self.dags[subdag.dag_id] = subdag
           self.dags[dag.dag_id] = dag
           self.dags_last_fetched[dag.dag_id] = timezone.utcnow()
           self.dags_hash[dag.dag_id] = row.dag_hash
   ```
   Let's say there's a situation:
   when `row = SerializedDagModel.get(dag_id, session)` executed and `self.dags_last_fetched[dag.dag_id] = timezone.utcnow()`  not executed, other process update the table `serialized_dag` of this dag,  `self.dags` will store the old dag, and self.dags_last_fetched will be bigger than `serialized_dag`  update_time,then code below will never execute:
   ```python
   if sd_last_updated_datetime > self.dags_last_fetched[dag_id]:
       self._add_dag_from_db(dag_id=dag_id, session=session)
   ```
   #### so the dag in memory will never be updated.
   
   
   And i think this code will fix the bug:
   ```python
       def _add_dag_from_db(self, dag_id: str, session: Session):
           """Add DAG to DagBag from DB"""
           from airflow.models.serialized_dag import SerializedDagModel
           
           last_fetched_time = timezone.utcnow()
           row = SerializedDagModel.get(dag_id, session)
           if not row:
               return None
   
           row.load_op_links = self.load_op_links
           dag = row.dag
           for subdag in dag.subdags:
               self.dags[subdag.dag_id] = subdag
           self.dags[dag.dag_id] = dag
           self.dags_last_fetched[dag.dag_id] = last_fetched_time
           self.dags_hash[dag.dag_id] = row.dag_hash
   ```
   
   ### What you think should happen instead
   
   _No response_
   
   ### How to reproduce
   
   _No response_
   
   ### Operating System
   
   linux
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on issue #24313: when i updated the dag, task in task_instance didn't update

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #24313:
URL: https://github.com/apache/airflow/issues/24313#issuecomment-1149461127

   Thanks for opening your first issue here! Be sure to follow the issue template!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on issue #24313: when i updated the dag, task in task_instance didn't update

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #24313:
URL: https://github.com/apache/airflow/issues/24313#issuecomment-1149705364

   Can you please make a PR with a fix and unit test reproducing the case? I think this will be much easier to discuss about it when we see the PR and can discuss on actual code. The worst case - we will close the PR. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org