You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/12/31 15:59:27 UTC

[GitHub] [airflow] soltanianalytics opened a new issue #13407: Clearing tasks for previously finished DAG runs in airflow 2.0 does not lead to scheduling of tasks

soltanianalytics opened a new issue #13407:
URL: https://github.com/apache/airflow/issues/13407


   **Apache Airflow version**: 2.0, LocalExecutor
   
   **Environment**: Docker on Windows 10 with WSL using image `apache/airflow:2.0.0-python3.8`
   
   **What happened**:
   
   Situation:
   - There is a DAG, say `mydag`, with
     - `catchup=True`
     - `max_active_runs=1`
   - Let's say there are two DAG runs, t=0 and t=1
   - The first task of the DAG is a sensor that senses if the previous DAG was succesful
   - Now, t=0 gets run, tasks are scheduled, and a task in t=0 fails
   - Then, t=1 gets run, and the first task - the sensor - cannot sense the successful task, thus keeps sensing
   - Now I clear the failed task in t=0 and expect that this would run, as it did in airflow 1.x
   - It doesn't - instead the scheduler gives the following:
   
   ```
   scheduler_1  | [2020-12-31 15:25:32,770] {scheduler_job.py:1667} INFO - DAG mydag already has 1 active runs, not queuing any tasks for run 2020-12-26 05:00:00+00:00 [note: this is t=0]
   ```
   
   Thus, t=0 never finishes and t=1 never sensed the finished run, and any t=n with n>1 also have no chance of ever succeeding.
   
   One alternative would be to remove the `max_active_runs` constraint, but that is not feasible, as this would create hundreds of DAG runs at once and that is a complete and total performance killer.
   
   **What you expected to happen**:
   
   As with previous airflow versions, I would expect that the cleared tasks get scheduled again, which they don't.
   
   **Why this happens**:
   
   tl;dr Ultimately, this happens because airflow uses `TI` instead of `DR` here: https://github.com/apache/airflow/blob/v2-0-stable/airflow/jobs/scheduler_job.py#L1499-L1509
   
   `_do_scheduling()`  runs `_schedule_dag_run()` once for each `dag_id`, and gives the set of active dag runs as arg, here: https://github.com/apache/airflow/blob/v2-0-stable/airflow/jobs/scheduler_job.py#L1515. The tasks that _should_ be queued are not queued because the dag runs are not in the abovementioned set of active dag runs. This is in spite of the fact that they are `running`. This is because https://github.com/apache/airflow/blob/v2-0-stable/airflow/jobs/scheduler_job.py#L1499-L1509 looks at all `TaskInstance`s of that dagrun and their execution date instead of looking at the `DagRun`s, and since the tasks were successfull or failed and then cleared, they are filtered out in the query. If you replace `TI` with `DR` in that query, this should work perfectly fine, without breaking anything that currently works and fixing this issue.
   
   **How to reproduce it**:
   
   You don't need to have the sensor logic I described above to reproduce this behavior. While I didn't do this, the following _should_ reproduce the behavior:
   
   - Create a DAG `mydag` with `catchup=True` and `max_active_runs=1`
   - Just have a dummy task or something, let it run a couple of times so you have a couple of successful DAG states
   - Pause the DAG*
   - Clear a couple of tasks in dag runs that were successful
   - run this snippet to see the result of the query with `TI` and `DR`, respectively
   
   ```Python
   from airflow import models, settings
   from airflow.utils.state import State
   TI = models.TaskInstance
   DR = models.DagRun
   dag_id = "mydag"
   
   result = "\n\nactive DAG runs according to current code logic:"
   for data_tuple in settings.Session().query(TI.dag_id, TI.execution_date).filter(TI.dag_id.in_([dag_id]), TI.state.notin_(list(State.finished))):
    result += "\n\t" + str(data_tuple)
   
   result += "\n\nactive DAG runs according to my proposed code logic:"
   for data_tuple in settings.Session().query(DR.dag_id, DR.execution_date).filter(DR.dag_id.in_([dag_id]), DR.state.in_([State.RUNNING])):
    result += "\n\t" + str(data_tuple)
   
   print(result, "\n")
   ```
   
   *Pausing of the DAG only avoids that your airflow instance works through the dag runs one-by-one; you would not need to pause if your DAG has a sensor that senses the success of the previous DAG like mine do.
   
   I will be creating a PR with the suggested fix shortly.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] soltanianalytics edited a comment on issue #13407: Clearing tasks for previously finished DAG runs in airflow 2.0 does not lead to scheduling of tasks (when max_active_runs is reached)

Posted by GitBox <gi...@apache.org>.
soltanianalytics edited a comment on issue #13407:
URL: https://github.com/apache/airflow/issues/13407#issuecomment-908426186


   Note that in the issue I described above, all `RagRun`s are already `running`, but tasks are not scheduled


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] soltanianalytics commented on issue #13407: Clearing tasks for previously finished DAG runs in airflow 2.0 does not lead to scheduling of tasks (when max_active_runs is reached)

Posted by GitBox <gi...@apache.org>.
soltanianalytics commented on issue #13407:
URL: https://github.com/apache/airflow/issues/13407#issuecomment-908383868


   If I don't let the current one fail first, or if Airflow otherwise has a hickup, then it will simply never schedule the "correct" `DagRun`s because of `max_active_runs` (so my idea of `max_active_runs` is that it should only apply when creating running `DagRun`s and Airflow should consider all `DagRun`s that are running irrespective of `max_active_runs` once they do run, however others seem to disagree with this interpretation)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] soltanianalytics commented on issue #13407: Clearing tasks for previously finished DAG runs in airflow 2.0 does not lead to scheduling of tasks

Posted by GitBox <gi...@apache.org>.
soltanianalytics commented on issue #13407:
URL: https://github.com/apache/airflow/issues/13407#issuecomment-753393443


   I did some more reading (mainly https://github.com/apache/airflow/issues/1442 and https://issues.apache.org/jira/browse/AIRFLOW-137). I see now that using TI was entirely on purpose. Currently, it is expected to ignore the `DagRun`s which were re-set to `running` by the clearing of tasks in order to avoid a violation of `max_active_runs`, until such a violation is avoided. My issue with this is that
   1. The scheduler does not schedule tasks in `DagRun`s which are, in fact, `running`
   2. When a user clears tasks, the user would _want_ these tasks to be scheduled, therefore I think the violation of `max_active_runs` - as is the case in my usecase - is on purpose and a feature, not a bug
   
   But, from https://github.com/apache/airflow/issues/1442 I can see that a user might also want to just have specific tasks run, but have them run across a large number of `DagRun`s, while only executing tasks in `max_active_runs` or less `DagRun`s. Arguably, when using backfill or just generally `catchup=True`, I would expect that I can rely on the tasks being executed ordered by execution_date, because if I just have my airflow installation running, that is also the order in which the tasks are being run. Thus, I think a second alternative is an approach where we keep the abovementioned logic but adjust it so that only tasks in the first `max_active_runs` `DagRun`s are run, ordered by `execution_date`.
   
   I will create a second PR with this alternative approach.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] soltanianalytics edited a comment on issue #13407: Clearing tasks for previously finished DAG runs in airflow 2.0 does not lead to scheduling of tasks (when max_active_runs is reached)

Posted by GitBox <gi...@apache.org>.
soltanianalytics edited a comment on issue #13407:
URL: https://github.com/apache/airflow/issues/13407#issuecomment-908426186


   Note that in the issue I described above, all `RagRun`s are already `running`, but tasks are not scheduled


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on issue #13407: Clearing tasks for previously finished DAG runs in airflow 2.0 does not lead to scheduling of tasks (when max_active_runs is reached)

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on issue #13407:
URL: https://github.com/apache/airflow/issues/13407#issuecomment-908376141


   > The behavior has indeed changed since `2.0.0`, and while not _perfect_, I can deal with it for now - closing the ticket
   
   Please can you provide more context to not _perfect_, I'm happy to look into this issue if you can explain more, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on issue #13407: Clearing tasks for previously finished DAG runs in airflow 2.0 does not lead to scheduling of tasks (when max_active_runs is reached)

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on issue #13407:
URL: https://github.com/apache/airflow/issues/13407#issuecomment-908371958






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on issue #13407: Clearing tasks for previously finished DAG runs in airflow 2.0 does not lead to scheduling of tasks

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #13407:
URL: https://github.com/apache/airflow/issues/13407#issuecomment-752994230


   Thanks for opening your first issue here! Be sure to follow the issue template!
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on issue #13407: Clearing tasks for previously finished DAG runs in airflow 2.0 does not lead to scheduling of tasks (when max_active_runs is reached)

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on issue #13407:
URL: https://github.com/apache/airflow/issues/13407#issuecomment-908371958


   @soltanianalytics please can you test this on 2.1.3, I was not able to reproduce


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] soltanianalytics commented on issue #13407: Clearing tasks for previously finished DAG runs in airflow 2.0 does not lead to scheduling of tasks (when max_active_runs is reached)

Posted by GitBox <gi...@apache.org>.
soltanianalytics commented on issue #13407:
URL: https://github.com/apache/airflow/issues/13407#issuecomment-908374021


   The behavior has indeed changed since `2.0.0`, and while not _perfect_, I can deal with it for now - closing the ticket


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] soltanianalytics commented on issue #13407: Clearing tasks for previously finished DAG runs in airflow 2.0 does not lead to scheduling of tasks (when max_active_runs is reached)

Posted by GitBox <gi...@apache.org>.
soltanianalytics commented on issue #13407:
URL: https://github.com/apache/airflow/issues/13407#issuecomment-908426186


   Note that in the issue I described above, all `RagRun`s are allready `running`, but tasks are not scheduled


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] soltanianalytics closed issue #13407: Clearing tasks for previously finished DAG runs in airflow 2.0 does not lead to scheduling of tasks (when max_active_runs is reached)

Posted by GitBox <gi...@apache.org>.
soltanianalytics closed issue #13407:
URL: https://github.com/apache/airflow/issues/13407


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] soltanianalytics commented on issue #13407: Clearing tasks for previously finished DAG runs in airflow 2.0 does not lead to scheduling of tasks (when max_active_runs is reached)

Posted by GitBox <gi...@apache.org>.
soltanianalytics commented on issue #13407:
URL: https://github.com/apache/airflow/issues/13407#issuecomment-908381563


   My core usecase is re-running DAGs that are not currently running.
   
   So I have a DAG with `max_active_runs=1` and `catchup=True`. This DAG depends on the previous `DagRun`s being successful. I implement this logic via a sensor that senses the success of the last task of the previous `DagRun`. If the previous `DagRun` failed, the current one will keep sensing into the abyss. It might fail after some time, too. Then I might have `n` DagRuns that I want to re-run. This `n` can be in the dozens. If I just clear the tasks of all the DAGs I want to run, they may not be running in order, but because all but the oldest tasks will have sensors that will not be successful until the previous `DagRun` was successful, this will only execute properly if the `DagRun`s are executed in chronological order.
   
   I can make that happen if I let the currently active `DagRun` fail before clearing tasks and setting `DagRun` states to `running`. _Most of the time_, that should do the trick. If not, I'll just delete all relevant `DagRun`s and then they'll re-appear chronologically.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on issue #13407: Clearing tasks for previously finished DAG runs in airflow 2.0 does not lead to scheduling of tasks

Posted by GitBox <gi...@apache.org>.
turbaszek commented on issue #13407:
URL: https://github.com/apache/airflow/issues/13407#issuecomment-753449271


   CC @ashb 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on issue #13407: Clearing tasks for previously finished DAG runs in airflow 2.0 does not lead to scheduling of tasks (when max_active_runs is reached)

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on issue #13407:
URL: https://github.com/apache/airflow/issues/13407#issuecomment-908383428


   Thanks @soltanianalytics 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] soltanianalytics commented on issue #13407: Clearing tasks for previously finished DAG runs in airflow 2.0 does not lead to scheduling of tasks (when max_active_runs is reached)

Posted by GitBox <gi...@apache.org>.
soltanianalytics commented on issue #13407:
URL: https://github.com/apache/airflow/issues/13407#issuecomment-908374021






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on issue #13407: Clearing tasks for previously finished DAG runs in airflow 2.0 does not lead to scheduling of tasks (when max_active_runs is reached)

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on issue #13407:
URL: https://github.com/apache/airflow/issues/13407#issuecomment-908424100


   Actually, the problem with `max_active_runs` is that the line of code below:
   https://github.com/apache/airflow/blob/24aa34b12d1f20b6d59b76a3787bb633f7a86fd7/airflow/jobs/scheduler_job.py#L973
   
   doesn't get distinct dag_ids. For example:
   
   dag1 has 8queued dagruns and dag2 has 5 queued dagruns and dag3 has 6
   
   The code would get all the 8 dagruns from dag1(if it has the closest date) and then 2 dagruns from dag2 and nothing from dag3.
   The correct thing should be for each dag, get all queued dagruns and set the state to running if the `max_active_runs` is not reached
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] soltanianalytics closed issue #13407: Clearing tasks for previously finished DAG runs in airflow 2.0 does not lead to scheduling of tasks (when max_active_runs is reached)

Posted by GitBox <gi...@apache.org>.
soltanianalytics closed issue #13407:
URL: https://github.com/apache/airflow/issues/13407


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org