You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/08/09 19:50:30 UTC

[GitHub] [airflow] michaelmicheal opened a new issue, #25627: Scheduler Critical Section Query Not Using Correct Index

michaelmicheal opened a new issue, #25627:
URL: https://github.com/apache/airflow/issues/25627

   ### Apache Airflow version
   
   Other Airflow 2 version
   
   ### What happened
   
   Airflow Version: 2.2.5
   MySQL Version: 8.0.18
   
   In the Scheduler, we are coming across instances where MySQL is inefficiently optimizing the [critical section task queuing query](https://github.com/apache/airflow/blob/2.2.5/airflow/jobs/scheduler_job.py#L294-L303). When a large number of task instances are scheduled, MySQL failing to use the `ti_state` index to filter the `task_instance` table, resulting in a full table scan (about 7.3 million rows).
   
   Normally,  when running the critical section query the index on `task_instance.state` is used to filter scheduled `task_instances`.
   ```bash
   | -> Limit: 512 row(s)  (actual time=5.290..5.413 rows=205 loops=1)
       -> Sort row IDs: <temporary>.tmp_field_0, <temporary>.execution_date, limit input to 512 row(s) per chunk  (actual time=5.289..5.391 rows=205 loops=1)
           -> Table scan on <temporary>  (actual time=0.003..0.113 rows=205 loops=1)
               -> Temporary table  (actual time=5.107..5.236 rows=205 loops=1)
                   -> Nested loop inner join  (cost=20251.99 rows=1741) (actual time=0.100..4.242 rows=205 loops=1)
                       -> Nested loop inner join  (cost=161.70 rows=12) (actual time=0.071..2.436 rows=205 loops=1)
                           -> Index lookup on task_instance using ti_state (state='scheduled')  (cost=80.85 rows=231) (actual time=0.051..1.992 rows=222 loops=1)
                           -> Filter: ((dag_run.run_type <> 'backfill') and (dag_run.state = 'running'))  (cost=0.25 rows=0) (actual time=0.002..0.002 rows=1 loops=222)
                               -> Single-row index lookup on dag_run using dag_run_dag_id_run_id_key (dag_id=task_instance.dag_id, run_id=task_instance.run_id)  (cost=0.25 rows=1) (actual time=0.001..0.001 rows=1 loops=222)
                       -> Filter: ((dag.is_paused = 0) and (task_instance.dag_id = dag.dag_id))  (cost=233.52 rows=151) (actual time=0.008..0.008 rows=1 loops=205)
                           -> Index range scan on dag (re-planned for each iteration)  (cost=233.52 rows=15072) (actual time=0.008..0.008 rows=1 loops=205)
   1 row in set, 1 warning (0.03 sec)
   ```
   
   When a large number of task_instances are in scheduled state at the same time, the index on `task_instance.state` is not being used to filter scheduled `task_instances`.
   
   ```bash
   | -> Limit: 512 row(s)  (actual time=12110.251..12110.573 rows=512 loops=1)
       -> Sort row IDs: <temporary>.tmp_field_0, <temporary>.execution_date, limit input to 512 row(s) per chunk  (actual time=12110.250..12110.526 rows=512 loops=1)
           -> Table scan on <temporary>  (actual time=0.005..0.800 rows=1176 loops=1)
               -> Temporary table  (actual time=12109.022..12109.940 rows=1176 loops=1)
                   -> Nested loop inner join  (cost=10807.83 rows=3) (actual time=1.328..12097.528 rows=1176 loops=1)
                       -> Nested loop inner join  (cost=10785.34 rows=64) (actual time=1.293..12084.371 rows=1193 loops=1)
                           -> Filter: (dag.is_paused = 0)  (cost=1371.40 rows=1285) (actual time=0.087..22.409 rows=13264 loops=1)
                               -> Table scan on dag  (cost=1371.40 rows=12854) (actual time=0.085..15.796 rows=13508 loops=1)
                           -> Filter: ((task_instance.state = 'scheduled') and (task_instance.dag_id = dag.dag_id))  (cost=0.32 rows=0) (actual time=0.907..0.909 rows=0 loops=13264)
                               -> Index lookup on task_instance using PRIMARY (dag_id=dag.dag_id)  (cost=0.32 rows=70) (actual time=0.009..0.845 rows=553 loops=13264)
                       -> Filter: ((dag_run.run_type <> 'backfill') and (dag_run.state = 'running'))  (cost=0.25 rows=0) (actual time=0.010..0.011 rows=1 loops=1193)
                           -> Single-row index lookup on dag_run using dag_run_dag_id_run_id_key (dag_id=task_instance.dag_id, run_id=task_instance.run_id)  (cost=0.25 rows=1) (actual time=0.009..0.010 rows=1 loops=1193)
   
   1 row in set, 1 warning (12.14 sec)
   ```
   
   ### What you think should happen instead
   
   To resolve this, I added a patch on the `scheduler_job.py` file, adding a MySQL index hint to use the `ti_state` index. 
   ```diff
   --- /usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py
   +++ /usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py
   @@ -293,6 +293,7 @@ class SchedulerJob(BaseJob):
                # and the dag is not paused
                query = (
                    session.query(TI)
   +                .with_hint(TI, 'USE INDEX (ti_state)', dialect_name='mysql')
                    .join(TI.dag_run)
                    .filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state == DagRunState.RUNNING)
                    .join(TI.dag_model)
   ```
   
   I think it makes sense to add this index hint upstream.
   
   ### How to reproduce
   
   Schedule a large number of dag runs and tasks in a short period of time.
   
   ### Operating System
   
   Debian GNU/Linux 10 (buster)
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other 3rd-party Helm chart
   
   ### Deployment details
   
   Airflow 2.2.5 on Kubernetes
   MySQL Version: 8.0.18
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] j-martin commented on issue #25627: MySQL Not Using Correct Index for Scheduler Critical Section Query

Posted by GitBox <gi...@apache.org>.
j-martin commented on issue #25627:
URL: https://github.com/apache/airflow/issues/25627#issuecomment-1212484587

   > Adding the hint globally and the dialect_name part would make it only apply for MySQL
   
   Yes, you are right!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] j-martin commented on issue #25627: Scheduler Critical Section Query Not Using Correct Index

Posted by GitBox <gi...@apache.org>.
j-martin commented on issue #25627:
URL: https://github.com/apache/airflow/issues/25627#issuecomment-1211332715

   Yes it is. @michaelmicheal can you update the title to make it clearer we are talking about MySQL.
   
   The questions for the airflow maintainers, would it be acceptable to open a PR to add the index hint when using mysql?
   
   ```python
   # Pseudo code
   if mysql:
      query = query.with_hint(TI, 'USE INDEX (ti_state)', dialect_name='mysql')
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on issue #25627: Scheduler Critical Section Query Not Using Correct Index

Posted by GitBox <gi...@apache.org>.
uranusjr commented on issue #25627:
URL: https://github.com/apache/airflow/issues/25627#issuecomment-1210220374

   Is this specific to MySQL?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on issue #25627: MySQL Not Using Correct Index for Scheduler Critical Section Query

Posted by GitBox <gi...@apache.org>.
uranusjr commented on issue #25627:
URL: https://github.com/apache/airflow/issues/25627#issuecomment-1211456966

   I don’t think you need the `if mysql` part (just adding the hint globally and the `dialect_name` part would make it only apply for MySQL), but yes a PR would be welcomed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb closed issue #25627: MySQL Not Using Correct Index for Scheduler Critical Section Query

Posted by GitBox <gi...@apache.org>.
ashb closed issue #25627: MySQL Not Using Correct Index for Scheduler Critical Section Query
URL: https://github.com/apache/airflow/issues/25627


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org