You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/28 03:58:36 UTC

[GitHub] [airflow] trollhe opened a new issue #15559: airflow dag success , but tasks not yet started,not scheduled.

trollhe opened a new issue #15559:
URL: https://github.com/apache/airflow/issues/15559


   hi,team:
   
       my dag is 1 minute schedule,one parts dag state is success,but tasks state is not yet started in a dag:
   
   ![image](https://user-images.githubusercontent.com/41068725/116344409-c1db7a80-a818-11eb-8e6e-62dce24e8b37.png)
   
      how can to fix it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating edited a comment on issue #15559: airflow dag success , but tasks not yet started,not scheduled.

Posted by GitBox <gi...@apache.org>.
SamWheating edited a comment on issue #15559:
URL: https://github.com/apache/airflow/issues/15559#issuecomment-831450306


   We've been experiencing the same issues (Airflow 2.0.2, 2x schedulers, MySQL 8). 
   
   Furthermore, when a task has `depends_on_past=True` this will cause the DAG to completely lock as no future runs can be created. 
   
   I'm currently trying to recreate this by running some high-frequency DAGs with and without multiple schedulers, I'll update here with my findings. 
   
   ### Update:
   
   I was able to get this to happen by running 3 schedulers with a DAG running every 1 minute:
   ![image](https://user-images.githubusercontent.com/16950874/116931200-b8924980-ac2e-11eb-9676-e59a04fa16d7.png)
   
   From the scheduler logs:
   `DEBUG - number of tis tasks for <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-03 20:09:00+00:00: scheduled__2021-05-03T20:09:00+00:00, externally triggered: False>: 0 task(s)`
   
   So it looks like the scheduler was running [DagRun.update_state()](https://github.com/apache/airflow/blob/73187871703bce22783a42db3d3cec9045ee1de2/airflow/models/dagrun.py#L384) before any Task instances had been created, which would cause this DAG to be marked as successful.
   
   Do you think this could this be either:
   1) A race condition in the scheduler code?
   2) A result of improper scheduler locking? (We're using mySQL 8 with `use_row_level_locking=True`)
   
   ### Update 2:
   
   I did some more investigating today and found that while the DAGRun and TaskInstances are created by one scheduler, they are soon after marked as successful by a different scheduler. 
   
   Here's some selected logs from one occurence of this issue to demonstrate this (I added debug log entries to the TaskInstance and DagRun constructors so that I could see when and where they were being created)
   
   **airflow-scheduler-1:**
   ```
   2021-05-04 20:12:00,123 dag.py: DEBUG - Created DagRun <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-04 20:11:00+00:00: scheduled__2021-05-04T20:11:00+00:00, externally triggered: False>
   
   2021-05-04 20:12:00,126 taskinstance.py: DEBUG - Created TaskInstance <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-04 20:11:00+00:00 [None]>
   
   2021-05-04 20:12:00,134 scheduler_job.py: INFO - DAG airflow-utils.send-airflow-heartbeat_every_minute is at (or above) max_active_runs (1 of 1), not creating any more runs
   ```
   
   **airflow-scheduler-2:**
   ```
   2021-05-04 20:12:00,156 dagrun.py: DEBUG - number of tis tasks for <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-04 20:11:00+00:00: scheduled__2021-05-04T20:11:00+00:00, externally triggered: False>: 0 task(s)
   
   2021-05-04 20:12:00,156 dagrun.py: INFO - Marking run <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-04 20:11:00+00:00: scheduled__2021-05-04T20:11:00+00:00, externally triggered: False> successful
   ```
   
   So within ~30ms of the DagRun being created, another scheduler instance marked it as complete based on having no tasks. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on issue #15559: airflow dag success , but tasks not yet started,not scheduled.

Posted by GitBox <gi...@apache.org>.
SamWheating commented on issue #15559:
URL: https://github.com/apache/airflow/issues/15559#issuecomment-832942105


   Just following up to confirm that switching our database's isolation level from `REPEATABLE READ` to `READ COMMITTED` fixed the original issue in our case and allowed us to run multiple schedulers without issue. 
   
   Its worth noting that the default for Postgres is `READ COMMITTED`, so Postgres users likely did not encounter this issue. 
   
   @trollhe, could you confirm the following?
   1) You're also using MySQL 8 with `REPEATABLE READ` isolation level. 
   2) Changing the isolation level to `READ COMMITTED` fixes the original issue. 
   
   ### Fixing this Issue
   
   I think there's a few possible ways to "fix" this issue. 
   
   1) Add a note in the docs that reminds users to configure the correct isolation level when using MySQL 8
   1) Add a check in [`create_engine_args()`](https://github.com/apache/airflow/blob/d295e708d982082caa68b08ab29c8aa4bf67dce5/airflow/settings.py#L240) to add `isolation_level=READ COMMITTED` when MySQL is being used. 
   1) Add a config option to provide additional engine args, or to set the isolation level.
   
   Any thoughts on the preferred fix? Feel free to assign this to me and I can implement one of these fixes this week.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating edited a comment on issue #15559: airflow dag success , but tasks not yet started,not scheduled.

Posted by GitBox <gi...@apache.org>.
SamWheating edited a comment on issue #15559:
URL: https://github.com/apache/airflow/issues/15559#issuecomment-831450306


   We've been experiencing the same issues (Airflow 2.0.2, 2x schedulers). 
   
   Furthermore, when a task has `depends_on_past=True` this will cause the DAG to completely lock as no future runs can be created. 
   
   I'm currently trying to recreate this by running some high-frequency DAGs with and without multiple schedulers, I'll update here with my findings. 
   
   ### Update:
   
   I was able to get this to happen by running 3 schedulers with a DAG running every 1 minute:
   ![image](https://user-images.githubusercontent.com/16950874/116931200-b8924980-ac2e-11eb-9676-e59a04fa16d7.png)
   
   From the scheduler logs:
   `DEBUG - number of tis tasks for <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-03 20:09:00+00:00: scheduled__2021-05-03T20:09:00+00:00, externally triggered: False>: 0 task(s)`
   
   So it looks like the scheduler was running [DagRun.update_state()](https://github.com/apache/airflow/blob/73187871703bce22783a42db3d3cec9045ee1de2/airflow/models/dagrun.py#L384) before any Task instances had been created, which would cause this DAG to be marked as successful.
   
   Do you think this could this be either:
   1) A race condition in the scheduler code?
   2) A result of improper scheduler locking? (We're using mySQL 8 with `use_row_level_locking=True`)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on issue #15559: airflow dag success , but tasks not yet started,not scheduled.

Posted by GitBox <gi...@apache.org>.
SamWheating commented on issue #15559:
URL: https://github.com/apache/airflow/issues/15559#issuecomment-831450306


   We've been experiencing the same issues (Airflow 2.0.2, 2x schedulers). 
   
   Furthermore, when a task has `depends_on_past=True` this will cause the DAG to completely lock as no future runs can be created. 
   
   I'm currently trying to recreate this by running some high-frequency DAGs with and without multiple schedulers, I'll update here with my findings. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MatthewRBruce commented on issue #15559: airflow dag success , but tasks not yet started,not scheduled.

Posted by GitBox <gi...@apache.org>.
MatthewRBruce commented on issue #15559:
URL: https://github.com/apache/airflow/issues/15559#issuecomment-832749201


   and confirmed with READ_COMMITTED, we get the expected results:
   ```
   >>> dag_run_query = session.query(cls).filter(cls.dag_id=='airflow-utils.send-airflow-heartbeat_every_minute').order_by(cls.execution_date.desc()).limit(10)
   >>> for dag_run in dag_run_query:
   ...   print(dag_run)
   ...
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:31:00+00:00: scheduled__2021-05-05T14:31:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:30:00+00:00: scheduled__2021-05-05T14:30:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:29:00+00:00: scheduled__2021-05-05T14:29:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:28:00+00:00: scheduled__2021-05-05T14:28:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:27:00+00:00: scheduled__2021-05-05T14:27:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:26:00+00:00: scheduled__2021-05-05T14:26:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:25:00+00:00: scheduled__2021-05-05T14:25:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:24:00+00:00: scheduled__2021-05-05T14:24:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:23:00+00:00: scheduled__2021-05-05T14:23:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:22:00+00:00: scheduled__2021-05-05T14:22:00+00:00, externally triggered: False>
   ```
   <wait a few minutes>
   ```
   >>> dag_query_run_update = session.query(cls).filter(cls.dag_id=='airflow-utils.send-airflow-heartbeat_every_minute').order_by(cls.execution_date.desc()).limit(10).with_for_update()
   >>> for dag_run in dag_query_run_update:
   ...   print(dag_run)
   ...
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:34:00+00:00: scheduled__2021-05-05T14:34:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:33:00+00:00: scheduled__2021-05-05T14:33:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:32:00+00:00: scheduled__2021-05-05T14:32:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:31:00+00:00: scheduled__2021-05-05T14:31:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:30:00+00:00: scheduled__2021-05-05T14:30:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:29:00+00:00: scheduled__2021-05-05T14:29:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:28:00+00:00: scheduled__2021-05-05T14:28:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:27:00+00:00: scheduled__2021-05-05T14:27:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:26:00+00:00: scheduled__2021-05-05T14:26:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:25:00+00:00: scheduled__2021-05-05T14:25:00+00:00, externally triggered: False>
   >>> ti_query = session.query(TaskInstance).filter(TaskInstance.dag_id=='airflow-utils.send-airflow-heartbeat_every_minute').order_by(TaskInstance.execution_date.desc()).limit(10)
   >>> for ti in ti_query:
   ...   print(ti)
   ...
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 14:34:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 14:33:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 14:32:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 14:31:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 14:30:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 14:29:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 14:28:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 14:27:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 14:26:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 14:25:00+00:00 [success]>
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk closed issue #15559: airflow dag success , but tasks not yet started,not scheduled.

Posted by GitBox <gi...@apache.org>.
potiuk closed issue #15559:
URL: https://github.com/apache/airflow/issues/15559


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MatthewRBruce commented on issue #15559: airflow dag success , but tasks not yet started,not scheduled.

Posted by GitBox <gi...@apache.org>.
MatthewRBruce commented on issue #15559:
URL: https://github.com/apache/airflow/issues/15559#issuecomment-832716285


   Okay, so we dug into this and here's what we found (TL;DR we think we're getting bit by mysql's default isolation level of REPEATABLE_READ):
   
   ```
   from airflow import settings
   from airflow.models import dagrun
   from airflow.models.dag import DagModel
   from airflow.models.dagrun import DagRun
   from airflow.models.taskinstance import TaskInstance
   from airflow.utils.types import DagRunType
   from airflow.utils.state import State
   from sqlalchemy import and_, func, not_, or_, tuple_
   from airflow.utils.sqlalchemy import UtcDateTime, nulls_first, skip_locked, with_row_locks
   
   session = settings.Session()
   cls = dagrun.DagRun
   ```
   Here we execute a query to get the recent DagRuns for `airflow-utils.send-airflow-heartbeat_every_minute` and we see we get runs up to `13:37:00`
   ```
   dag_run_query = session.query(cls).filter(cls.dag_id=='airflow-utils.send-airflow-heartbeat_every_minute').order_by(cls.execution_date.desc()).limit(10)
   for dag_run in dag_run_query:
   ...   print(dag_run)
   ...
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:37:00+00:00: scheduled__2021-05-05T13:37:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:36:00+00:00: scheduled__2021-05-05T13:36:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:35:00+00:00: scheduled__2021-05-05T13:35:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:34:00+00:00: scheduled__2021-05-05T13:34:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:33:00+00:00: scheduled__2021-05-05T13:33:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:32:00+00:00: scheduled__2021-05-05T13:32:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:31:00+00:00: scheduled__2021-05-05T13:31:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:30:00+00:00: scheduled__2021-05-05T13:30:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:29:00+00:00: scheduled__2021-05-05T13:29:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:28:00+00:00: scheduled__2021-05-05T13:28:00+00:00, externally triggered: False>
   ```
   
   Next we wait ~5 mins, and we run this query again but with `FOR UPDATE` and we get the new rows (This is how the schdeuler will determine what dag runs to schedule here: https://github.com/apache/airflow/blob/master/airflow/jobs/scheduler_job.py#L1477)
   ```
   dag_query_run_update = session.query(cls).filter(cls.dag_id=='airflow-utils.send-airflow-heartbeat_every_minute').order_by(cls.execution_date.desc()).limit(10).with_for_update()
   for dag_run in dag_query_run_update:
   ...   print(dag_run)
   ...
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:42:00+00:00: scheduled__2021-05-05T13:42:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:41:00+00:00: scheduled__2021-05-05T13:41:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:40:00+00:00: scheduled__2021-05-05T13:40:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:39:00+00:00: scheduled__2021-05-05T13:39:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:38:00+00:00: scheduled__2021-05-05T13:38:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:37:00+00:00: scheduled__2021-05-05T13:37:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:36:00+00:00: scheduled__2021-05-05T13:36:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:35:00+00:00: scheduled__2021-05-05T13:35:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:34:00+00:00: scheduled__2021-05-05T13:34:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 13:33:00+00:00: scheduled__2021-05-05T13:33:00+00:00, externally triggered: False>
   ```
   
   So, there are some new DagRuns, great.  When the scheduler goes to get the related task instances (https://github.com/apache/airflow/blob/master/airflow/models/dagrun.py#L307-L310) it will execute the query, but in the original snapshot:
   ```
   ti_query = session.query(TaskInstance).filter(TaskInstance.dag_id=='airflow-utils.send-airflow-heartbeat_every_minute').order_by(TaskInstance.execution_date.desc()).limit(10)
   for ti in ti_query:
   ...   print (ti)
   ...
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:37:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:36:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:35:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:34:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:33:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:32:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:31:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:30:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:29:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 13:28:00+00:00 [success]>
   ```
   As we can see in ^  This query can only see the TIs up until 13:37:00, so it finds 0 tasks for the recent runs, which means this https://github.com/apache/airflow/blob/master/airflow/models/dagrun.py#L444 will mark the DAG run as successful
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] DHARESHWAR commented on issue #15559: airflow dag success , but tasks not yet started,not scheduled.

Posted by GitBox <gi...@apache.org>.
DHARESHWAR commented on issue #15559:
URL: https://github.com/apache/airflow/issues/15559#issuecomment-828197217


   I am facing similar issues on **Airflow 2.0.1**
   Dag is successful and task didn't start. It is happening when I am running 2 airflow schedulers
   
   
   <img width="1437" alt="Screenshot 2021-04-28 at 12 23 38 PM" src="https://user-images.githubusercontent.com/9838200/116359654-ae321300-a81c-11eb-9262-365170446c3f.png">
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] MatthewRBruce edited a comment on issue #15559: airflow dag success , but tasks not yet started,not scheduled.

Posted by GitBox <gi...@apache.org>.
MatthewRBruce edited a comment on issue #15559:
URL: https://github.com/apache/airflow/issues/15559#issuecomment-832749201


   and confirmed with READ_COMMITTED, we get the expected results:
   ```
   >>> dag_run_query = session.query(cls).filter(cls.dag_id=='airflow-utils.send-airflow-heartbeat_every_minute').order_by(cls.execution_date.desc()).limit(10)
   >>> for dag_run in dag_run_query:
   ...   print(dag_run)
   ...
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:31:00+00:00: scheduled__2021-05-05T14:31:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:30:00+00:00: scheduled__2021-05-05T14:30:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:29:00+00:00: scheduled__2021-05-05T14:29:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:28:00+00:00: scheduled__2021-05-05T14:28:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:27:00+00:00: scheduled__2021-05-05T14:27:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:26:00+00:00: scheduled__2021-05-05T14:26:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:25:00+00:00: scheduled__2021-05-05T14:25:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:24:00+00:00: scheduled__2021-05-05T14:24:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:23:00+00:00: scheduled__2021-05-05T14:23:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:22:00+00:00: scheduled__2021-05-05T14:22:00+00:00, externally triggered: False>
   ```
   
   wait a few minutes
   
   ```
   >>> dag_query_run_update = session.query(cls).filter(cls.dag_id=='airflow-utils.send-airflow-heartbeat_every_minute').order_by(cls.execution_date.desc()).limit(10).with_for_update()
   >>> for dag_run in dag_query_run_update:
   ...   print(dag_run)
   ...
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:34:00+00:00: scheduled__2021-05-05T14:34:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:33:00+00:00: scheduled__2021-05-05T14:33:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:32:00+00:00: scheduled__2021-05-05T14:32:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:31:00+00:00: scheduled__2021-05-05T14:31:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:30:00+00:00: scheduled__2021-05-05T14:30:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:29:00+00:00: scheduled__2021-05-05T14:29:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:28:00+00:00: scheduled__2021-05-05T14:28:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:27:00+00:00: scheduled__2021-05-05T14:27:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:26:00+00:00: scheduled__2021-05-05T14:26:00+00:00, externally triggered: False>
   <DagRun airflow-utils.send-airflow-heartbeat_every_minute @ 2021-05-05 14:25:00+00:00: scheduled__2021-05-05T14:25:00+00:00, externally triggered: False>
   >>> ti_query = session.query(TaskInstance).filter(TaskInstance.dag_id=='airflow-utils.send-airflow-heartbeat_every_minute').order_by(TaskInstance.execution_date.desc()).limit(10)
   >>> for ti in ti_query:
   ...   print(ti)
   ...
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 14:34:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 14:33:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 14:32:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 14:31:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 14:30:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 14:29:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 14:28:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 14:27:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 14:26:00+00:00 [success]>
   <TaskInstance: airflow-utils.send-airflow-heartbeat_every_minute.send_heartbeat 2021-05-05 14:25:00+00:00 [success]>
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org