You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/05/05 14:06:12 UTC

[GitHub] [airflow] MatthewRBruce commented on issue #15559: airflow dag success , but tasks not yet started,not scheduled.

MatthewRBruce commented on issue #15559:
URL: https://github.com/apache/airflow/issues/15559#issuecomment-832716285


   Okay, so we dug into this and here's what we found (TL;DR we think we're getting bit by mysql's default isolation level of REPEATABLE_READ):
   
   ```
   from airflow import settings
   from airflow.models import dagrun
   from airflow.models.dag import DagModel
   from airflow.models.dagrun import DagRun
   from airflow.models.taskinstance import TaskInstance
   from airflow.utils.types import DagRunType
   from airflow.utils.state import State
   from sqlalchemy import and_, func, not_, or_, tuple_
   from airflow.utils.sqlalchemy import UtcDateTime, nulls_first, skip_locked, with_row_locks
   
   session = settings.Session()
   cls = dagrun.DagRun
   ```
   Here we execute a query to get the recent DagRuns for `airflow-utils.send-airflow-heartbeat_every_minute` and we see we get runs up to `13:37:00`
   ```
   dag_run_query = session.query(cls).filter(cls.dag_id=='airflow-utils.send-airflow-heartbeat_every_minute').order_by(cls.execution_date.desc()).limit(10)
   for dag_run in dag_run_query:
   ...   print(dag_run)
   ...
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:37:00+00:00: scheduled__2021-05-05T13:37:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:36:00+00:00: scheduled__2021-05-05T13:36:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:35:00+00:00: scheduled__2021-05-05T13:35:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:34:00+00:00: scheduled__2021-05-05T13:34:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:33:00+00:00: scheduled__2021-05-05T13:33:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:32:00+00:00: scheduled__2021-05-05T13:32:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:31:00+00:00: scheduled__2021-05-05T13:31:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:30:00+00:00: scheduled__2021-05-05T13:30:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:29:00+00:00: scheduled__2021-05-05T13:29:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:28:00+00:00: scheduled__2021-05-05T13:28:00+00:00, externally triggered: False>
   ```
   
   Next we wait ~5 mins, and we run this query again but with `FOR UPDATE` and we get the new rows (This is how the schdeuler will determine what dag runs to schedule here: https://github.com/apache/airflow/blob/master/airflow/jobs/scheduler_job.py#L1477)
   ```
   dag_query_run_update = session.query(cls).filter(cls.dag_id=='airflow-utils.send-airflow-heartbeat_every_minute').order_by(cls.execution_date.desc()).limit(10).with_for_update()
   for dag_run in dag_query_run_update:
   ...   print(dag_run)
   ...
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:42:00+00:00: scheduled__2021-05-05T13:42:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:41:00+00:00: scheduled__2021-05-05T13:41:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:40:00+00:00: scheduled__2021-05-05T13:40:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:39:00+00:00: scheduled__2021-05-05T13:39:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:38:00+00:00: scheduled__2021-05-05T13:38:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:37:00+00:00: scheduled__2021-05-05T13:37:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:36:00+00:00: scheduled__2021-05-05T13:36:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:35:00+00:00: scheduled__2021-05-05T13:35:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:34:00+00:00: scheduled__2021-05-05T13:34:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:33:00+00:00: scheduled__2021-05-05T13:33:00+00:00, externally triggered: False>
   ```
   
   So, there are some new DagRuns, great.  When the scheduler goes to get the related task instances (https://github.com/apache/airflow/blob/master/airflow/models/dagrun.py#L307-L310) it will execute the query, but in the original snapshot:
   ```
   ti_query = session.query(TaskInstance).filter(TaskInstance.dag_id=='airflow-utils.send-airflow-heartbeat_every_minute').order_by(TaskInstance.execution_date.desc()).limit(10)
   for ti in ti_query:
   ...   print (ti)
   ...
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:37:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:36:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:35:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:34:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:33:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:32:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:31:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:30:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:29:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:28:00+00:00 [success]>
   ```
   As we can see in ^  This query can only see the TIs up until 13:37:00, so it finds 0 tasks for the recent runs, which means this https://github.com/apache/airflow/blob/master/airflow/models/dagrun.py#L444 will mark the DAG run as successful
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org