You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/07/26 15:57:57 UTC

[GitHub] [airflow] ashb opened a new pull request, #25312: Don't mistakenly take a lock on DagRun via ti.refresh_from_fb

ashb opened a new pull request, #25312:
URL: https://github.com/apache/airflow/pull/25312

   In 2.2.0 we made TI.dag_run be automatically join-loaded, which is fine
   for most cases, but for `refresh_from_db` we don't need that (we don't
   access anything under ti.dag_run) and it's possible that when
   `lock_for_update=True` is passed we are locking more than we want to and
   _might_ cause deadlocks.
   
   Even if it doesn't, selecting more than we need is wasteful.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #25312: Don't mistakenly take a lock on DagRun via ti.refresh_from_fb

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #25312:
URL: https://github.com/apache/airflow/pull/25312#discussion_r931150798


##########
airflow/models/taskinstance.py:
##########
@@ -848,11 +848,15 @@ def refresh_from_db(self, session: Session = NEW_SESSION, lock_for_update: bool
         """
         self.log.debug("Refreshing TaskInstance %s from DB", self)
 
-        qry = session.query(TaskInstance).filter(
-            TaskInstance.dag_id == self.dag_id,
-            TaskInstance.task_id == self.task_id,
-            TaskInstance.run_id == self.run_id,
-            TaskInstance.map_index == self.map_index,
+        qry = (
+            session.query(TaskInstance)
+            .filter(
+                TaskInstance.dag_id == self.dag_id,
+                TaskInstance.task_id == self.task_id,
+                TaskInstance.run_id == self.run_id,
+                TaskInstance.map_index == self.map_index,
+            )

Review Comment:
   I've left a comment, and the best way I found to do it was to do `query(TI.__table__.columns)` which will only ever select the specified columns and never any relationships -- that way (with the comment) I don't think the pre-commit is needed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #25312: Don't mistakenly take a lock on DagRun via ti.refresh_from_fb

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #25312:
URL: https://github.com/apache/airflow/pull/25312#discussion_r932466512


##########
airflow/models/taskinstance.py:
##########
@@ -848,28 +849,35 @@ def refresh_from_db(self, session: Session = NEW_SESSION, lock_for_update: bool
         """
         self.log.debug("Refreshing TaskInstance %s from DB", self)
 
-        qry = session.query(TaskInstance).filter(
-            TaskInstance.dag_id == self.dag_id,
-            TaskInstance.task_id == self.task_id,
-            TaskInstance.run_id == self.run_id,
-            TaskInstance.map_index == self.map_index,
+        if self in session:
+            session.refresh(self, TaskInstance.__mapper__.column_attrs.keys())
+
+        qry = (
+            # To avoid joining any relationships by default select the all

Review Comment:
   ```suggestion
               # To avoid joining any relationships, by default select all
   ```
   
   nit, took me a couple readings to sort this out



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #25312: Don't mistakenly take a lock on DagRun via ti.refresh_from_fb

Posted by GitBox <gi...@apache.org>.
ashb commented on PR #25312:
URL: https://github.com/apache/airflow/pull/25312#issuecomment-1195671622

   @RNHTTR Can you see if this change does anything for the deadlock you are able to reproduce?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25312: Don't mistakenly take a lock on DagRun via ti.refresh_from_fb

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25312:
URL: https://github.com/apache/airflow/pull/25312#discussion_r930156607


##########
airflow/models/taskinstance.py:
##########
@@ -848,11 +848,15 @@ def refresh_from_db(self, session: Session = NEW_SESSION, lock_for_update: bool
         """
         self.log.debug("Refreshing TaskInstance %s from DB", self)
 
-        qry = session.query(TaskInstance).filter(
-            TaskInstance.dag_id == self.dag_id,
-            TaskInstance.task_id == self.task_id,
-            TaskInstance.run_id == self.run_id,
-            TaskInstance.map_index == self.map_index,
+        qry = (
+            session.query(TaskInstance)
+            .filter(
+                TaskInstance.dag_id == self.dag_id,
+                TaskInstance.task_id == self.task_id,
+                TaskInstance.run_id == self.run_id,
+                TaskInstance.map_index == self.map_index,
+            )

Review Comment:
   I guess it would be good to leave a comment why we are doing it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25312: Don't mistakenly take a lock on DagRun via ti.refresh_from_fb

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25312:
URL: https://github.com/apache/airflow/pull/25312#discussion_r930336561


##########
airflow/models/taskinstance.py:
##########
@@ -848,11 +848,15 @@ def refresh_from_db(self, session: Session = NEW_SESSION, lock_for_update: bool
         """
         self.log.debug("Refreshing TaskInstance %s from DB", self)
 
-        qry = session.query(TaskInstance).filter(
-            TaskInstance.dag_id == self.dag_id,
-            TaskInstance.task_id == self.task_id,
-            TaskInstance.run_id == self.run_id,
-            TaskInstance.map_index == self.map_index,
+        qry = (
+            session.query(TaskInstance)
+            .filter(
+                TaskInstance.dag_id == self.dag_id,
+                TaskInstance.task_id == self.task_id,
+                TaskInstance.run_id == self.run_id,
+                TaskInstance.map_index == self.map_index,
+            )

Review Comment:
   I guess (from the discussion) this "feature" of SQL of locking the other side is kinda undocumented and dangerous ... and something unexpected as welll (breakes the "explicit is better than implicit" zen of Python) so I am afraid we might not find a "nicer" way of disabling it, because it feels like a hack.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25312: Don't mistakenly take a lock on DagRun via ti.refresh_from_fb

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25312:
URL: https://github.com/apache/airflow/pull/25312#discussion_r930337550


##########
airflow/models/taskinstance.py:
##########
@@ -848,11 +848,15 @@ def refresh_from_db(self, session: Session = NEW_SESSION, lock_for_update: bool
         """
         self.log.debug("Refreshing TaskInstance %s from DB", self)
 
-        qry = session.query(TaskInstance).filter(
-            TaskInstance.dag_id == self.dag_id,
-            TaskInstance.task_id == self.task_id,
-            TaskInstance.run_id == self.run_id,
-            TaskInstance.map_index == self.map_index,
+        qry = (
+            session.query(TaskInstance)
+            .filter(
+                TaskInstance.dag_id == self.dag_id,
+                TaskInstance.task_id == self.task_id,
+                TaskInstance.run_id == self.run_id,
+                TaskInstance.map_index == self.map_index,
+            )

Review Comment:
   Maybe we should add some automation checking it - It should be ony when you select Taskinstance with lock, which should be kinda possible to do with some clever AST analysis.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Don't mistakenly take a lock on DagRun via ti.refresh_from_fb [airflow]

Posted by "dstaple (via GitHub)" <gi...@apache.org>.
dstaple commented on PR #25312:
URL: https://github.com/apache/airflow/pull/25312#issuecomment-1768764027

   > /cc @potiuk Even if this doesn't fix the deadlock it's probably worth doing
   
   For future reference, this PR did indeed fix the specific deadlock issue from https://github.com/apache/airflow/issues/23361 , thanks again folks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #25312: Don't mistakenly take a lock on DagRun via ti.refresh_from_fb

Posted by GitBox <gi...@apache.org>.
ashb commented on PR #25312:
URL: https://github.com/apache/airflow/pull/25312#issuecomment-1195670753

   /cc @potiuk 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #25312: Don't mistakenly take a lock on DagRun via ti.refresh_from_fb

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #25312:
URL: https://github.com/apache/airflow/pull/25312#discussion_r930161877


##########
airflow/models/taskinstance.py:
##########
@@ -848,11 +848,15 @@ def refresh_from_db(self, session: Session = NEW_SESSION, lock_for_update: bool
         """
         self.log.debug("Refreshing TaskInstance %s from DB", self)
 
-        qry = session.query(TaskInstance).filter(
-            TaskInstance.dag_id == self.dag_id,
-            TaskInstance.task_id == self.task_id,
-            TaskInstance.run_id == self.run_id,
-            TaskInstance.map_index == self.map_index,
+        qry = (
+            session.query(TaskInstance)
+            .filter(
+                TaskInstance.dag_id == self.dag_id,
+                TaskInstance.task_id == self.task_id,
+                TaskInstance.run_id == self.run_id,
+                TaskInstance.map_index == self.map_index,
+            )

Review Comment:
   I wonder if there is a way of saying to SQLA "only select this, never select anything else" (i.e. to prevent other regressions in the future



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25312: Don't mistakenly take a lock on DagRun via ti.refresh_from_fb

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25312:
URL: https://github.com/apache/airflow/pull/25312#discussion_r930337550


##########
airflow/models/taskinstance.py:
##########
@@ -848,11 +848,15 @@ def refresh_from_db(self, session: Session = NEW_SESSION, lock_for_update: bool
         """
         self.log.debug("Refreshing TaskInstance %s from DB", self)
 
-        qry = session.query(TaskInstance).filter(
-            TaskInstance.dag_id == self.dag_id,
-            TaskInstance.task_id == self.task_id,
-            TaskInstance.run_id == self.run_id,
-            TaskInstance.map_index == self.map_index,
+        qry = (
+            session.query(TaskInstance)
+            .filter(
+                TaskInstance.dag_id == self.dag_id,
+                TaskInstance.task_id == self.task_id,
+                TaskInstance.run_id == self.run_id,
+                TaskInstance.map_index == self.map_index,
+            )

Review Comment:
   Maybe we should add some automation checking it (pre-commit)- It should be ony when you select Taskinstance with lock, which should be kinda possible to do with some clever AST analysis. I think this one is rather disruptive so we go the extra mile to check it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb merged pull request #25312: Don't mistakenly take a lock on DagRun via ti.refresh_from_fb

Posted by GitBox <gi...@apache.org>.
ashb merged PR #25312:
URL: https://github.com/apache/airflow/pull/25312


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org