You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2023/01/11 18:34:17 UTC
[GitHub] [airflow] RNHTTR opened a new issue, #28865: dag.test() hangs with deferrable operators if DAG is paused
RNHTTR opened a new issue, #28865:
URL: https://github.com/apache/airflow/issues/28865
### Apache Airflow version
2.5.0
### What happened
When trying to use [`dag.test()`](https://airflow.apache.org/docs/apache-airflow/stable/executor/debug.html) with a DAG that uses a deferrable operator, the deferrable operator task is immediately marked as having run successfully, but the test then hangs in perpetuity:
```
docker exec f988cd7d2786 python dags/example_dag_basic.py
...
[2023-01-11 18:06:21,799] {taskinstance.py:1386} INFO - Pausing task as DEFERRED. dag_id=test_dag, task_id=wait_some_seconds_async, execution_date=20230111T180621, start_date=
[2023-01-11 18:06:21,804] {dag.py:3626} INFO - wait_some_seconds_async ran successfully!
[2023-01-11 18:06:21,804] {dag.py:3629} INFO - *****************************************************
```
### What you think should happen instead
Either the test should run to completion, or the test should fail with a useful error message indicating that it failed because the DAG is paused.
### How to reproduce
Create the below DAG, pause it, then run `python <filename>.py`.
```
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
from airflow.sensors.date_time import DateTimeSensorAsync
with DAG(
"test_dag",
default_args={
"depends_on_past": False,
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
},
description="A simple tutorial DAG",
schedule="@daily",
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:
t0a = DateTimeSensorAsync(
task_id="wait_some_seconds_async",
target_time=datetime.now() + timedelta(seconds=5)
)
```
### Operating System
Debian GNU/Linux 11 (bullseye)
### Versions of Apache Airflow Providers
n/a
### Deployment
Astronomer
### Deployment details
Locally via `astro dev start` with the Astro CLI. Also [reproduced by another airflow user using Docker Compose](https://apache-airflow.slack.com/archives/CCR6P6JRL/p1673337666527429).
### Anything else
Also seeing the following error even when tasks (async or regular) run successfully with `dag.test()`:
```
[2023-01-11 18:30:50,644] {dagrun.py:879} WARNING - Failed to record first_task_scheduling_delay metric:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/airflow/models/dagrun.py", line 867, in _emit_true_scheduling_delay_stats_for_finished_state
first_start_date = ordered_tis_by_start_date[0].start_date
IndexError: list index out of range
```
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] pankajastro commented on issue #28865: dag.test() hangs with deferrable operators if DAG is paused
Posted by "pankajastro (via GitHub)" <gi...@apache.org>.
pankajastro commented on issue #28865:
URL: https://github.com/apache/airflow/issues/28865#issuecomment-1596598792
It occurs when a task resumes and dag is paused. i.e. either if a task is in a deferrable state and from deferrable it resume again. or if we have more than 1 try and the task fails in that case it stuck in re-try.
Few things we can do
- shall we raise an error if dag is paused?
- shall we unpaused the dag if dag is paused and then run the dag (We do this when we trigger dag from UI i.e manually)
- we should keep some map for tis state and set the TI state to success if a state change from deferrable to schedule similar keep count for try num for each ti?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr commented on issue #28865: dag.test() hangs with deferrable operators if DAG is paused
Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on issue #28865:
URL: https://github.com/apache/airflow/issues/28865#issuecomment-1467612690
The error in `_emit_true_scheduling_delay_stats_for_finished_state` is not related and has been fixed in main, for what it is worth.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] RNHTTR commented on issue #28865: dag.test() hangs with deferrable operators if DAG is paused
Posted by "RNHTTR (via GitHub)" <gi...@apache.org>.
RNHTTR commented on issue #28865:
URL: https://github.com/apache/airflow/issues/28865#issuecomment-1470577163
> @RNHTTR Did you miss a `dag.test()` line in the reproducing DAG posted above? I can reproduce if I add it at the end of the file. Also it seems whether the DAG is paused or not is irrelevant to me, `dag.test()` always hangs.
Yes, I think it ought to have `dag.test()` in an `if __name__ == "__main__"` block
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kamalesh0406 commented on issue #28865: dag.test() hangs with deferrable operators if DAG is paused
Posted by GitBox <gi...@apache.org>.
kamalesh0406 commented on issue #28865:
URL: https://github.com/apache/airflow/issues/28865#issuecomment-1381480923
Hi, I am kind of unable to set up the dev env locally due to space issues. While I try to figure out a fix, it would be better for someone else to take up the task. I will try to take up another issue in the future once I have my setup fixed. Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr commented on issue #28865: dag.test() hangs with deferrable operators if DAG is paused
Posted by GitBox <gi...@apache.org>.
uranusjr commented on issue #28865:
URL: https://github.com/apache/airflow/issues/28865#issuecomment-1379900888
Go ahead.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr commented on issue #28865: dag.test() hangs with deferrable operators if DAG is paused
Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on issue #28865:
URL: https://github.com/apache/airflow/issues/28865#issuecomment-1467648834
@RNHTTR Did you miss a `dag.test()` line in the reproducing DAG posted above?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] phanikumv commented on issue #28865: dag.test() hangs with deferrable operators if DAG is paused
Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on issue #28865:
URL: https://github.com/apache/airflow/issues/28865#issuecomment-1592470799
@pankajastro is looking into this issue
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kamalesh0406 commented on issue #28865: dag.test() hangs with deferrable operators if DAG is paused
Posted by GitBox <gi...@apache.org>.
kamalesh0406 commented on issue #28865:
URL: https://github.com/apache/airflow/issues/28865#issuecomment-1379895553
@potiuk I can look into this issue, can you assign it to me?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] collinmcnulty commented on issue #28865: dag.test() hangs with deferrable operators if DAG is paused
Posted by "collinmcnulty (via GitHub)" <gi...@apache.org>.
collinmcnulty commented on issue #28865:
URL: https://github.com/apache/airflow/issues/28865#issuecomment-1468815077
@uranusjr
```
import pendulum
from airflow.decorators import dag, task
from airflow.sensors.time_delta import TimeDeltaSensorAsync
@dag(
default_args={'retries': 2},
schedule_interval=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
)
def waiting_dag():
a = TimeDeltaSensorAsync(task_id="waiting",
delta=pendulum.duration(seconds=10))
waiting_dag()
```
And then I ran the command `airflow tasks test waiting_dag waiting`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] pankajastro commented on issue #28865: dag.test() hangs with deferrable operators if DAG is paused
Posted by "pankajastro (via GitHub)" <gi...@apache.org>.
pankajastro commented on issue #28865:
URL: https://github.com/apache/airflow/issues/28865#issuecomment-1601113615
> A somewhat more general question: Is it possible to run a run without unpausing the DAG at all?
afaik In case of retry and deferable, it does not work. Currently, the test function loop which monitors dagrun submits TI's and assume that it would either fail or succeed. But does not care that it can again come to schedule from a deferred state. I have drafted a solution here https://github.com/apache/airflow/pull/32000 to handle these 2 cases. Please let me know how it is looking
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr commented on issue #28865: dag.test() hangs with deferrable operators if DAG is paused
Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on issue #28865:
URL: https://github.com/apache/airflow/issues/28865#issuecomment-1467643923
@collinmcnulty Do you have a DAG to reproduce the issue? The DAG in the OP fnishes `airflow tasks test` as expected.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr commented on issue #28865: dag.test() hangs with deferrable operators if DAG is paused
Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on issue #28865:
URL: https://github.com/apache/airflow/issues/28865#issuecomment-1597536569
A somewhat more general question: Is it possible to run a run without unpausing the DAG at all? Personally I think it would be very useful feature, enabling the workflow of writing a DAG (including a schedule), pause it (so it doesn’t run), test it, and only unpause when everything is validated. It would not only be useful for `test()`, but also trigger.
But if this feature is too involved to implement, the first option (error out) seems to most reasonable.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] collinmcnulty commented on issue #28865: dag.test() hangs with deferrable operators if DAG is paused
Posted by "collinmcnulty (via GitHub)" <gi...@apache.org>.
collinmcnulty commented on issue #28865:
URL: https://github.com/apache/airflow/issues/28865#issuecomment-1464519371
It appears that there is a similar failure, probably the same root cause, for `tasks test`.
```
/usr/local/airflow$ airflow tasks test waiting_dag waiting
[2023-03-10 21:53:10,054] {dagbag.py:538} INFO - Filling up the DagBag from /usr/local/airflow/dags
/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py:159 RemovedInAirflow3Warning: Calling `DAG.create_dagrun()` without an explicit data interval is deprecated
[2023-03-10 21:53:10,125] {taskinstance.py:1083} INFO - Dependencies all met for <TaskInstance: waiting_dag.waiting __airflow_temporary_run_2023-03-10T21:53:10.094786+00:00__ [None]>
[2023-03-10 21:53:10,132] {taskinstance.py:1083} INFO - Dependencies all met for <TaskInstance: waiting_dag.waiting __airflow_temporary_run_2023-03-10T21:53:10.094786+00:00__ [None]>
[2023-03-10 21:53:10,132] {taskinstance.py:1279} INFO -
--------------------------------------------------------------------------------
[2023-03-10 21:53:10,132] {taskinstance.py:1280} INFO - Starting attempt 1 of 3
[2023-03-10 21:53:10,132] {taskinstance.py:1281} INFO -
--------------------------------------------------------------------------------
[2023-03-10 21:53:10,133] {taskinstance.py:1300} INFO - Executing <Task(TimeDeltaSensorAsync): waiting> on 2023-03-10T21:53:10.094780+00:00
[2023-03-10 21:53:10,150] {taskinstance.py:1507} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=waiting_dag
AIRFLOW_CTX_TASK_ID=waiting
AIRFLOW_CTX_EXECUTION_DATE=2023-03-10T21:53:10.094780+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=__airflow_temporary_run_2023-03-10T21:53:10.094786+00:00__
[2023-03-10 21:53:10,153] {taskinstance.py:1382} INFO - Pausing task as DEFERRED. dag_id=waiting_dag, task_id=waiting, execution_date=20230310T215310, start_date=
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org