You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/01/16 14:46:47 UTC

[GitHub] [airflow] ephraimbuddy opened a new pull request #20894: Obtain lock for update when scheduling tis

ephraimbuddy opened a new pull request #20894:
URL: https://github.com/apache/airflow/pull/20894


   Not sure how we get deadlock at this method even with one scheduler
   so I'm suggesting we lock it for update
   
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #20894: Obtain lock for update when scheduling tis

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #20894:
URL: https://github.com/apache/airflow/pull/20894#issuecomment-1016322047


   I am rather sceptical if it changes anything (unless I missed something).
   
   What this change effectively does it would attempt to run:
   
   ```
   UPDATE task_instance SET state=%(state)s WHERE task_instance.dag_id = %(dag_id_1)s
   AND task_instance.run_id = %(run_id_1)s AND task_instance.task_id IN () FOR UPDATE;'
   ```
   
   Which I think is not happening anyway because it makes no sense. The FOR UPDATE clause only makes sense in select queries.  Row lock happens always when your UPDATE query, so I think sqlalchemy will simply silently ignore the whit_for_updae() call. 
   
   What I think we really need to fix for this query is fix the OTHER query that causes it (but we need the logs from server to find out what the other query was). From how I understand here this piece of code should be protected by earlier locking of "DAG_RUN" row and there is another query that locks TASK_INSTANCE rows for that DAG_RUN - and it should not.
   
   There are two reasons it could happen:
   
   1) The other query does not lock the DAG_RUN row "deliberately" 
   2) We have a commit somewhere that releases the DAG_RUN lock
   
   I suspected initially that it's mini-scheduler, but I think it must be something else (mini-scheduler correctly  locks the DAG_RUN and I could not find any place where it would release it.
   
   One other reason is that it could be an API call or UI action that updates the task_instances.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #20894: Obtain lock for update when scheduling tis

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #20894:
URL: https://github.com/apache/airflow/pull/20894#issuecomment-1016322047


   I am rather sceptical if it changes anything (unless I missed something).
   
   What thi change effectively does it would attempt to run:
   
   ```
   UPDATE task_instance SET state=%(state)s WHERE task_instance.dag_id = %(dag_id_1)s
   AND task_instance.run_id = %(run_id_1)s AND task_instance.task_id IN () FOR UPDATE;'
   ```
   
   Which I think is not happening anyway because it makes no sense. the FOR UPDATE clause only makes sense in select queries.  Row lock happens always when your UPDATE query, so I think sqlalchemy will simply silently ignore the whith_for_updae() clause. 
   
   What I think we really need to fix for this query is fix the OTHER query that causes it (but we need the logs from server to find out what the other query was). From how I understand here this piece of code should be protected by earlier locking of "DAG_RUN" row and there is another query that locks TASK_INSTANCE rows for that DAG_RUN - and it should not.
   
   There are two reasons it could happen:
   
   1) The other query does not lock the DAG_RUN row "deliberately" 
   2) We have a commit somewhere that releases the DAG_RUN lock
   
   I suspected initially that it's mini-scheduler, but I think it must be something else (mini-scheduler correctly  locks the DAG_RUN and I could not find any place where it would release it.
   
   One other reason is that it could be an API call or UI action that updates the task_instances.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #20894: Obtain lock for update when scheduling tis

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #20894:
URL: https://github.com/apache/airflow/pull/20894#issuecomment-1019538985


   Yeah. I can confirm that this change has not changed anything, as I suspected @ephraimbuddy . You can test it yourself. This is what I did (latest breeze  + postgres)
   
   ```
   export  AIRFLOW__CORE__SQL_ALCHEMY_ENGINE_ARGS='{"echo": true}'
   ```
   ^^ This enables echo of the queries generated by SQLAlchemy.
   
   ```
   pytest tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_callbacks_are_called -s
   ```
   ^^ This is one of the tests that reaches this query.
   
   ```
   UPDATE task_instance SET start_date=%(start_date)s, end_date=%(end_date)s, duration=%(duration)s, state=%(state)s
   WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.run_id = %(run_id_1)s AND task_instance.task_id
   IN (%(task_id_1)s)
   
   "{'start_date': datetime.datetime(2022, 1, 23, 18, 1, 14, 880695, tzinfo=Timezone('UTC')),
   'end_date': datetime.datetime(2022, 1, 23, 18, 1, 14, 880701, tzinfo=Timezone('UTC')),
   'duration': 0, 'state': <TaskInstanceState.SUCCESS: 'success'>,
   'dag_id_1': 'test_task_start_date_scheduling',
   'run_id_1': 'scheduled__2016-01-01T00:00:00+00:00', 'task_id_1': 'dummy2'}"
   ```
   ^^ This was the query generated (as you can see there is no extra lock here). To make it easy to find I just added 'raise Exception` right after the query - this one was the last query printed.
   
   I reverted the commit "Obtain lock for update when scheduling tis" and re-run it:
   
   ```
   UPDATE task_instance SET start_date=%(start_date)s, end_date=%(end_date)s, duration=%(duration)s, state=%(state)s
   WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.run_id = %(run_id_1)s AND task_instance.task_id 
   IN (%(task_id_1)s)
   
   {'start_date': datetime.datetime(2022, 1, 23, 18, 11, 22, 12081, tzinfo=Timezone('UTC')), 
   'end_date': datetime.datetime(2022, 1, 23, 18, 11, 22, 12087, tzinfo=Timezone('UTC')), 
   'duration': 0, 'state': <TaskInstanceState.SUCCESS: 'success'>, 
   'dag_id_1': 'test_task_start_date_scheduling', 
   'run_id_1': 'scheduled__2016-01-01T00:00:00+00:00', 'task_id_1': 'dummy2'}
   ```
   ^^ As you can see, we have the very same query generated by SQLAlchemy, no matter if we do the query with/without row lock. Exactly as I suspected - sqlalchemy will simply ignore the "with_for_update". I will re-open the original error, because IMHO, this change has no chance of fixing it, because it does not change anything :(. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #20894: Obtain lock for update when scheduling tis

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #20894:
URL: https://github.com/apache/airflow/pull/20894#issuecomment-1016322047


   I am rather sceptical if it changes anything (unless I missed something).
   
   What thi change effectively does it would attempt to run:
   
   ```
   UPDATE task_instance SET state=%(state)s WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.run_id = %(run_id_1)s AND task_instance.task_id IN () FOR UPDATE;'
   ```
   
   Which I think is not happening anyway because it makes no sense. the FOR UPDATE clause only makes sense in select queries.  Row lock happens always when your UPDATE query, so I think sqlalchemy will simply silently ignore the whith_for_updae() clause. 
   
   What I think we really need to fix for this query is fix the OTHER query that causes it (but we need the logs from server to find out what the other query was). From how I understand here this piece of code should be protected by earlier locking of "DAG_RUN" row and there is another query that locks TASK_INSTANCE rows for that DAG_RUN - and it should not.
   
   There are two reasons it could happen:
   
   1) The other query does not lock the DAG_RUN row "deliberately" 
   2) We have a commit somewhere that releases the DAG_RUN lock
   
   I suspected initially that it's mini-scheduler, but I think it must be something else (mini-scheduler correctly  locks the DAG_RUN and I could not find any place where it would release it.
   
   One other reason is that it could be an API call or UI action that updates the task_instances.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #20894: Obtain lock for update when scheduling tis

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #20894:
URL: https://github.com/apache/airflow/pull/20894#issuecomment-1016322047


   I am rather sceptical if it changes anything (unless I missed something).
   
   What this change effectively does it would attempt to run:
   
   ```
   UPDATE task_instance SET state=%(state)s WHERE task_instance.dag_id = %(dag_id_1)s
   AND task_instance.run_id = %(run_id_1)s AND task_instance.task_id IN () FOR UPDATE;'
   ```
   
   Which I think is not happening anyway because it makes no sense. The FOR UPDATE clause only makes sense in select queries.  Row lock happens always when your UPDATE query runs anyway, so I think sqlalchemy will simply silently ignore the wiht_for_update() call (this is the main reason why the deadlock happen because the update query attempts to lock TASK_INSTANCE rows exclusively and fails). 
   
   What I think we really need to fix for this query is fix the OTHER query that causes it (but we need the logs from server to find out what the other query was). From how I understand here this piece of code should be protected by earlier locking of "DAG_RUN" row. I believe the deadlock is caused by another query that locks TASK_INSTANCE rows for that DAG_RUN -  and this should not happen and we should fix the other query.
   
   There are two reasons it could happen:
   
   1) The other query does not lock the DAG_RUN row "deliberately" 
   2) We have a commit somewhere that releases the DAG_RUN lock
   
   I suspected initially that it's mini-scheduler, but I think it must be something else (mini-scheduler correctly  locks the DAG_RUN and I could not find any place where it would release it).
   
   One other reason is that it could be an API call or UI action that updates the TASK_INSTANCE table.
   
   UPDATE: I thought `synchronize_session = False` might have something to do with it but I looked up the docs, and it looks it only affects updating the objects kept in the current session, the UPDATE query would run anyway the same as I understand it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #20894: Obtain lock for update when scheduling tis

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #20894:
URL: https://github.com/apache/airflow/pull/20894#issuecomment-1019538985


   Yeah. I can confirm that this change has not changed anything, as I suspected @ephraimbuddy . You can test it yourself. This is what I did (latest breeze  + postgres)
   
   ```
   export  AIRFLOW__CORE__SQL_ALCHEMY_ENGINE_ARGS='{"echo": true}'
   ```
   ^^ This enables echo of the queries generated by SQLAlchemy.
   
   ```
   pytest tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_callbacks_are_called -s
   ```
   ^^ This is one of the tests that reaches this query.
   
   ```
   UPDATE task_instance SET start_date=%(start_date)s, end_date=%(end_date)s, duration=%(duration)s, state=%(state)s
   WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.run_id = %(run_id_1)s AND task_instance.task_id
   IN (%(task_id_1)s)
   
   "{'start_date': datetime.datetime(2022, 1, 23, 18, 1, 14, 880695, tzinfo=Timezone('UTC')),
   'end_date': datetime.datetime(2022, 1, 23, 18, 1, 14, 880701, tzinfo=Timezone('UTC')),
   'duration': 0, 'state': <TaskInstanceState.SUCCESS: 'success'>,
   'dag_id_1': 'test_task_start_date_scheduling',
   'run_id_1': 'scheduled__2016-01-01T00:00:00+00:00', 'task_id_1': 'dummy2'}"
   ```
   ^^ This was the query generated (as you can see there is no extra lock here). To make it easy to find I just added 'raise Exception` right after the query - this one was the last query printed.
   
   I reverted the commit "Obtain lock for update when scheduling tis" and re-run it:
   
   ```
   UPDATE task_instance SET start_date=%(start_date)s, end_date=%(end_date)s, duration=%(duration)s, state=%(state)s
   WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.run_id = %(run_id_1)s AND task_instance.task_id 
   IN (%(task_id_1)s) 
   {'start_date': datetime.datetime(2022, 1, 23, 18, 11, 22, 12081, tzinfo=Timezone('UTC')), 
   'end_date': datetime.datetime(2022, 1, 23, 18, 11, 22, 12087, tzinfo=Timezone('UTC')), 
   'duration': 0, 'state': <TaskInstanceState.SUCCESS: 'success'>, 
   'dag_id_1': 'test_task_start_date_scheduling', 
   'run_id_1': 'scheduled__2016-01-01T00:00:00+00:00', 'task_id_1': 'dummy2'}
   ```
   ^^ As you can see, we have the very same query generated by SQLAlchemy, no matter if we do the query with/without row lock. Exactly as I suspected. I will re-open the original error, because IMHO, this change has no chance of fixing it, because it does not change anything :(. 
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #20894: Obtain lock for update when scheduling tis

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #20894:
URL: https://github.com/apache/airflow/pull/20894#issuecomment-1019549288


   > Wow. Not aware of the {"echo": true}. Thanks very much. I will look into this again
   
   Pretty handy :) . I just learned about it actually while looking at it.
   
   I thought a bit about this, and I think this might actually be something not really Airflow-code originated. It really looks like it coudl be caused by someone running some manual UI operations and not committing it . It feels strange that Task Instance rows are locked without the DagRun, so it almost feels like it's either a custom code or manual operations of the users. I spent quite some time looking at a a potential path where the TI rows could be locked, but I could not find anything honestly.
   
   So I'd really insist on the users who see it provide us some more details - particularly what is the "other" query that locks the instances. I think the code that get's the deadlock is "correct" - it's the "other" code that is at fault (but we have no idea what the other code is and whether it is a code or human).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #20894: Obtain lock for update when scheduling tis

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #20894:
URL: https://github.com/apache/airflow/pull/20894#issuecomment-1016765008


   > What made me try this change was because a similar thing happened with multiple schedulers and #18975 solved it.
   
   Yeah - but this is really the difference SELECT vs. UPDATE. The #18975 was SELECT query:
   
   ```
   tis: List[TI] = session.query(TI).filter(filter_for_tis).options(selectinload('dag_model')).all()
   ```
   
   Where here we have an UPDATE query:
   
   ```
   session.query(TI)
   .filter(
       TI.dag_id == self.dag_id,
       TI.run_id == self.run_id,
       TI.task_id.in_(schedulable_ti_ids),
   )
   .update({TI.state: State.SCHEDULED}, synchronize_session=False)
   ```
   
   UPDATE query obtains lock automatically for all rows accessed (so what happens first in the query it tries to obtain locks on all the rows it accesses and `with_row_lock` has no effect.
   
   > I'm of the opinion that if dag_run lock is the problem, then the log would be around dagrun and not task_instance
   
   The dag_run is not the problem. In theory (that how the DAG_RUN lock was designed), you should not even attempt to get a lock on any TASK_INSTANCE before obtaining DAG_RUN lock first. This is what "with_row_lock" SELECT FROM DAG_RUN SKIP_LOCKED few methods up in the stack should be doing before we get here. 
   
   This is what  `next_dagruns_to_examine` is doing by:
   
   ```
          query = (
               session.query(cls)
               .filter(cls.state == state, cls.run_type != DagRunType.BACKFILL_JOB)
               .join(DagModel, DagModel.dag_id == cls.dag_id)
               .filter(DagModel.is_paused == false(), DagModel.is_active == true())
           )
          ....
   
           return with_row_locks(
               query.limit(max_number), of=cls, session=session, **skip_locked(session=session)
           )
   ```
   
   @ashb? Is that correct ? 
   
   What happens here - we have the DAG_RUN lock from the few methods up, so we should be safe to update TASK_INSTANCES. but for some reason someone else who did not have the DAG_RUN lock also tries to update those instances (or run SELECT FOR UPDATE for those).
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #20894: Obtain lock for update when scheduling tis

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #20894:
URL: https://github.com/apache/airflow/pull/20894#issuecomment-1019549288


   > Wow. Not aware of the {"echo": true}. Thanks very much. I will look into this again
   
   Pretty handy :) . I just learned about it actually while looking at it.
   
   I thought a bit abou this, and I think this might actually be something not really Airflow-code originated. It really looks like it coudl be caused by someone running some manual UI operations and not committing it . It feels strange that Task Instance rows are locked without the DagRun, so it almost feels like it's either a custom code or manual operations of the users. I spent quite some time looking at a a potential path where the TI rows could be locked, but I could not find anything honestly.
   
   So I'd really insist on the users who see it provide us some more details - particularly what is the "other" query that locks the instances. I think the code that get's the deadlock is "correct" - it's the "other" code that is at fault (but we have no idea what the other code is and whether it is a code or human).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #20894: Obtain lock for update when scheduling tis

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #20894:
URL: https://github.com/apache/airflow/pull/20894#issuecomment-1019538985


   Yeah. I can confirm that this change has not changed anything, as I suspected @ephraimbuddy . You can test it yourself. This is what I did (latest breeze  + postgres)
   
   ```
   export  AIRFLOW__CORE__SQL_ALCHEMY_ENGINE_ARGS='{"echo": true}'
   ```
   ^^ This enables echo of the queries generated by SQLAlchemy.
   
   ```
   pytest tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_callbacks_are_called -s
   ```
   ^^ This is one of the tests that reaches this query.
   
   ```
   UPDATE task_instance SET start_date=%(start_date)s, end_date=%(end_date)s, duration=%(duration)s, state=%(state)s
   WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.run_id = %(run_id_1)s AND task_instance.task_id
   IN (%(task_id_1)s)
   
   "{'start_date': datetime.datetime(2022, 1, 23, 18, 1, 14, 880695, tzinfo=Timezone('UTC')),
   'end_date': datetime.datetime(2022, 1, 23, 18, 1, 14, 880701, tzinfo=Timezone('UTC')),
   'duration': 0, 'state': <TaskInstanceState.SUCCESS: 'success'>,
   'dag_id_1': 'test_task_start_date_scheduling',
   'run_id_1': 'scheduled__2016-01-01T00:00:00+00:00', 'task_id_1': 'dummy2'}"
   ```
   ^^ This was the query generated (as you can see there is no extra lock here). To make it easy to find I just added 'raise Exception` right after the query - this one was the last query printed.
   
   I reverted the commit "Obtain lock for update when scheduling tis" and re-run it:
   
   ```
   UPDATE task_instance SET start_date=%(start_date)s, end_date=%(end_date)s, duration=%(duration)s, state=%(state)s
   WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.run_id = %(run_id_1)s AND task_instance.task_id 
   IN (%(task_id_1)s)
   
   {'start_date': datetime.datetime(2022, 1, 23, 18, 11, 22, 12081, tzinfo=Timezone('UTC')), 
   'end_date': datetime.datetime(2022, 1, 23, 18, 11, 22, 12087, tzinfo=Timezone('UTC')), 
   'duration': 0, 'state': <TaskInstanceState.SUCCESS: 'success'>, 
   'dag_id_1': 'test_task_start_date_scheduling', 
   'run_id_1': 'scheduled__2016-01-01T00:00:00+00:00', 'task_id_1': 'dummy2'}
   ```
   ^^ As you can see, we have the very same query generated by SQLAlchemy, no matter if we do the query with/without row lock. Exactly as I suspected. I will re-open the original error, because IMHO, this change has no chance of fixing it, because it does not change anything :(. 
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #20894: Obtain lock for update when scheduling tis

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #20894:
URL: https://github.com/apache/airflow/pull/20894#issuecomment-1015894105


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #20894: Obtain lock for update when scheduling tis

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #20894:
URL: https://github.com/apache/airflow/pull/20894#issuecomment-1019547330


   Wow. Not aware of the {"echo": true}. Thanks very much. 
   I will look into this again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #20894: Obtain lock for update when scheduling tis

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #20894:
URL: https://github.com/apache/airflow/pull/20894#issuecomment-1016322047






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #20894: Obtain lock for update when scheduling tis

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #20894:
URL: https://github.com/apache/airflow/pull/20894#issuecomment-1016322047


   I am rather sceptical if it changes anything (unless I missed something).
   
   What this change effectively does it would attempt to run:
   
   ```
   UPDATE task_instance SET state=%(state)s WHERE task_instance.dag_id = %(dag_id_1)s
   AND task_instance.run_id = %(run_id_1)s AND task_instance.task_id IN () FOR UPDATE;'
   ```
   
   Which I think is not happening anyway because it makes no sense. The FOR UPDATE clause only makes sense in select queries.  Row lock happens always when your UPDATE query, so I think sqlalchemy will simply silently ignore the whit_for_updae() call. 
   
   What I think we really need to fix for this query is fix the OTHER query that causes it (but we need the logs from server to find out what the other query was). From how I understand here this piece of code should be protected by earlier locking of "DAG_RUN" row. I believe the deadlock is caused by another query that locks TASK_INSTANCE rows for that DAG_RUN -  and this should not happen and we should fix the other query.
   
   There are two reasons it could happen:
   
   1) The other query does not lock the DAG_RUN row "deliberately" 
   2) We have a commit somewhere that releases the DAG_RUN lock
   
   I suspected initially that it's mini-scheduler, but I think it must be something else (mini-scheduler correctly  locks the DAG_RUN and I could not find any place where it would release it.
   
   One other reason is that it could be an API call or UI action that updates the task_instances.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] stablum commented on pull request #20894: Obtain lock for update when scheduling tis

Posted by GitBox <gi...@apache.org>.
stablum commented on pull request #20894:
URL: https://github.com/apache/airflow/pull/20894#issuecomment-1080263259


   Hello,
   
   unfortunately I'm still getting deadlocks: https://github.com/apache/airflow/issues/19957#issuecomment-1079965417


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #20894: Obtain lock for update when scheduling tis

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #20894:
URL: https://github.com/apache/airflow/pull/20894#issuecomment-1016322047


   I am rather sceptical if it changes anything (unless I missed something).
   
   What this change effectively does it would attempt to run:
   
   ```
   UPDATE task_instance SET state=%(state)s WHERE task_instance.dag_id = %(dag_id_1)s
   AND task_instance.run_id = %(run_id_1)s AND task_instance.task_id IN () FOR UPDATE;'
   ```
   
   Which I think is not happening anyway because it makes no sense. The FOR UPDATE clause only makes sense in select queries.  Row lock happens always when your UPDATE query, so I think sqlalchemy will simply silently ignore the wiht_for_update() call. 
   
   What I think we really need to fix for this query is fix the OTHER query that causes it (but we need the logs from server to find out what the other query was). From how I understand here this piece of code should be protected by earlier locking of "DAG_RUN" row. I believe the deadlock is caused by another query that locks TASK_INSTANCE rows for that DAG_RUN -  and this should not happen and we should fix the other query.
   
   There are two reasons it could happen:
   
   1) The other query does not lock the DAG_RUN row "deliberately" 
   2) We have a commit somewhere that releases the DAG_RUN lock
   
   I suspected initially that it's mini-scheduler, but I think it must be something else (mini-scheduler correctly  locks the DAG_RUN and I could not find any place where it would release it.
   
   One other reason is that it could be an API call or UI action that updates the task_instances.
   
   UPDATE: I thought `synchronize_session = False` might have something to do with it but I looked up the docs, and it looks it only affects updating the objects kept in the current session, the UPDATE query would run anyway the same as I understand it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy merged pull request #20894: Obtain lock for update when scheduling tis

Posted by GitBox <gi...@apache.org>.
ephraimbuddy merged pull request #20894:
URL: https://github.com/apache/airflow/pull/20894


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #20894: Obtain lock for update when scheduling tis

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #20894:
URL: https://github.com/apache/airflow/pull/20894#issuecomment-1016322047


   I am rather sceptical if it changes anything (unless I missed something).
   
   What this change effectively does it would attempt to run:
   
   ```
   UPDATE task_instance SET state=%(state)s WHERE task_instance.dag_id = %(dag_id_1)s
   AND task_instance.run_id = %(run_id_1)s AND task_instance.task_id IN () FOR UPDATE;'
   ```
   
   Which I think is not happening anyway because it makes no sense. The FOR UPDATE clause only makes sense in select queries.  Row lock happens always when your UPDATE query, so I think sqlalchemy will simply silently ignore the wiht_for_update() call. 
   
   What I think we really need to fix for this query is fix the OTHER query that causes it (but we need the logs from server to find out what the other query was). From how I understand here this piece of code should be protected by earlier locking of "DAG_RUN" row. I believe the deadlock is caused by another query that locks TASK_INSTANCE rows for that DAG_RUN -  and this should not happen and we should fix the other query.
   
   There are two reasons it could happen:
   
   1) The other query does not lock the DAG_RUN row "deliberately" 
   2) We have a commit somewhere that releases the DAG_RUN lock
   
   I suspected initially that it's mini-scheduler, but I think it must be something else (mini-scheduler correctly  locks the DAG_RUN and I could not find any place where it would release it).
   
   One other reason is that it could be an API call or UI action that updates the task_instances.
   
   UPDATE: I thought `synchronize_session = False` might have something to do with it but I looked up the docs, and it looks it only affects updating the objects kept in the current session, the UPDATE query would run anyway the same as I understand it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #20894: Obtain lock for update when scheduling tis

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #20894:
URL: https://github.com/apache/airflow/pull/20894#issuecomment-1016322047


   I am rather sceptical if it changes anything (unless I missed something).
   
   What this change effectively does it would attempt to run:
   
   ```
   UPDATE task_instance SET state=%(state)s WHERE task_instance.dag_id = %(dag_id_1)s
   AND task_instance.run_id = %(run_id_1)s AND task_instance.task_id IN () FOR UPDATE;'
   ```
   
   Which I think is not happening anyway because it makes no sense. The FOR UPDATE clause only makes sense in select queries.  Row lock happens always when your UPDATE query, so I think sqlalchemy will simply silently ignore the wiht_for_update() call. 
   
   What I think we really need to fix for this query is fix the OTHER query that causes it (but we need the logs from server to find out what the other query was). From how I understand here this piece of code should be protected by earlier locking of "DAG_RUN" row. I believe the deadlock is caused by another query that locks TASK_INSTANCE rows for that DAG_RUN -  and this should not happen and we should fix the other query.
   
   There are two reasons it could happen:
   
   1) The other query does not lock the DAG_RUN row "deliberately" 
   2) We have a commit somewhere that releases the DAG_RUN lock
   
   I suspected initially that it's mini-scheduler, but I think it must be something else (mini-scheduler correctly  locks the DAG_RUN and I could not find any place where it would release it).
   
   One other reason is that it could be an API call or UI action that updates the TASK_INSTANCE table.
   
   UPDATE: I thought `synchronize_session = False` might have something to do with it but I looked up the docs, and it looks it only affects updating the objects kept in the current session, the UPDATE query would run anyway the same as I understand it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #20894: Obtain lock for update when scheduling tis

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #20894:
URL: https://github.com/apache/airflow/pull/20894#issuecomment-1016322047


   I am rather sceptical if it changes anything (unless I missed something).
   
   What this change effectively does it would attempt to run:
   
   ```
   UPDATE task_instance SET state=%(state)s WHERE task_instance.dag_id = %(dag_id_1)s
   AND task_instance.run_id = %(run_id_1)s AND task_instance.task_id IN () FOR UPDATE;'
   ```
   
   Which I think is not happening anyway because it makes no sense. The FOR UPDATE clause only makes sense in select queries.  Row lock happens always when your UPDATE query, so I think sqlalchemy will simply silently ignore the whit_for_updae() call. 
   
   What I think we really need to fix for this query is fix the OTHER query that causes it (but we need the logs from server to find out what the other query was). From how I understand here this piece of code should be protected by earlier locking of "DAG_RUN" row. I believe the deadlock is caused by another query that locks TASK_INSTANCE rows for that DAG_RUN -  and this should not happen and we should fix the other query.
   
   There are two reasons it could happen:
   
   1) The other query does not lock the DAG_RUN row "deliberately" 
   2) We have a commit somewhere that releases the DAG_RUN lock
   
   I suspected initially that it's mini-scheduler, but I think it must be something else (mini-scheduler correctly  locks the DAG_RUN and I could not find any place where it would release it.
   
   One other reason is that it could be an API call or UI action that updates the task_instances.
   
   UPDATE: I thought `synchronize_session = False` might have something to do with it but I looked up the docs, and it looks it only affects updating the objects kept in the current session, the UPDATE query would run anyway the same as I understand it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #20894: Obtain lock for update when scheduling tis

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #20894:
URL: https://github.com/apache/airflow/pull/20894#issuecomment-1016743065






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #20894: Obtain lock for update when scheduling tis

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #20894:
URL: https://github.com/apache/airflow/pull/20894#issuecomment-1016765008


   > What made me try this change was because a similar thing happened with multiple schedulers and #18975 solved it.
   
   Yeah - but this is really the difference SELECT vs. UPDATE. The #18975 was SELECT query:
   
   ```
   tis: List[TI] = session.query(TI).filter(filter_for_tis).options(selectinload('dag_model')).all()
   ```
   
   Where here we have an UPDATE query:
   
   ```
   session.query(TI)
   .filter(
       TI.dag_id == self.dag_id,
       TI.run_id == self.run_id,
       TI.task_id.in_(schedulable_ti_ids),
   )
   .update({TI.state: State.SCHEDULED}, synchronize_session=False)
   ```
   
   UPDATE query obtains lock automatically for all rows accessed (so what happens first in the query it tries to obtain locks on all the rows it accesses and `with_row_lock` has no effect.
   
   > I'm of the opinion that if dag_run lock is the problem, then the log would be around dagrun and not task_instance
   
   The dag_run is not the problem. In theory (that how the DAG_RUN lock was designed), you should not even attempt to get a lock on any TASK_INSTANCE before obtaining DAG_RUN lock first. This is what "with_row_lock" SELECT FROM DAG_RUN SKIP_LOCKED few methods up in the stack should be doing before we get here. 
   
   This is what  `next_dagruns_to_examine` is doing by:
   
   ```
          query = (
               session.query(cls)
               .filter(cls.state == state, cls.run_type != DagRunType.BACKFILL_JOB)
               .join(DagModel, DagModel.dag_id == cls.dag_id)
               .filter(DagModel.is_paused == false(), DagModel.is_active == true())
           )
          ....
   
           return with_row_locks(
               query.limit(max_number), of=cls, session=session, **skip_locked(session=session)
           )
   ```
   
   @ashb? Is that correct ? 
   
   What happens here - we have the DAG_RUN lock from the few methods up, so we should be safe to update TASK_INSTANCES. but for some reason someone else who did not have the DAG_RUN lock also tries to update those instances.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org