You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2023/01/11 18:34:17 UTC

[GitHub] [airflow] RNHTTR opened a new issue, #28865: dag.test() hangs with deferrable operators if DAG is paused

RNHTTR opened a new issue, #28865:
URL: https://github.com/apache/airflow/issues/28865

   ### Apache Airflow version
   
   2.5.0
   
   ### What happened
   
   When trying to use [`dag.test()`](https://airflow.apache.org/docs/apache-airflow/stable/executor/debug.html) with a DAG that uses a deferrable operator, the deferrable operator task is immediately marked as having run successfully, but the test then hangs in perpetuity:
   
   ```
   docker exec f988cd7d2786 python dags/example_dag_basic.py
   
   ...
   
   [2023-01-11 18:06:21,799] {taskinstance.py:1386} INFO - Pausing task as DEFERRED. dag_id=test_dag, task_id=wait_some_seconds_async, execution_date=20230111T180621, start_date=
   [2023-01-11 18:06:21,804] {dag.py:3626} INFO - wait_some_seconds_async ran successfully!
   [2023-01-11 18:06:21,804] {dag.py:3629} INFO - *****************************************************
   ```
   
   ### What you think should happen instead
   
   Either the test should run to completion, or the test should fail with a useful error message indicating that it failed because the DAG is paused.
   
   ### How to reproduce
   
   Create the below DAG, pause it, then run `python <filename>.py`.
   
   ```
   from airflow import DAG
   from airflow.operators.bash import BashOperator
   from datetime import datetime, timedelta
   
   from airflow.sensors.date_time import DateTimeSensorAsync
   
   with DAG(
       "test_dag",
       default_args={
           "depends_on_past": False,
           "email": ["airflow@example.com"],
           "email_on_failure": False,
           "email_on_retry": False,
           "retries": 1,
       },
       description="A simple tutorial DAG",
       schedule="@daily",
       start_date=datetime(2021, 1, 1),
       catchup=False,
       tags=["example"],
   ) as dag:
       t0a = DateTimeSensorAsync(
           task_id="wait_some_seconds_async",
           target_time=datetime.now() + timedelta(seconds=5)
       )
   ```
   
   ### Operating System
   
   Debian GNU/Linux 11 (bullseye)
   
   ### Versions of Apache Airflow Providers
   
   n/a
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   Locally via `astro dev start` with the Astro CLI. Also [reproduced by another airflow user using Docker Compose](https://apache-airflow.slack.com/archives/CCR6P6JRL/p1673337666527429).
   
   ### Anything else
   
   Also seeing the following error even when tasks (async or regular) run successfully with `dag.test()`:
   
   ```
   [2023-01-11 18:30:50,644] {dagrun.py:879} WARNING - Failed to record first_task_scheduling_delay metric:
   Traceback (most recent call last):
     File "/usr/local/lib/python3.9/site-packages/airflow/models/dagrun.py", line 867, in _emit_true_scheduling_delay_stats_for_finished_state
       first_start_date = ordered_tis_by_start_date[0].start_date
   IndexError: list index out of range
   
   ```
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on issue #28865: dag.test() hangs with deferrable operators if DAG is paused

Posted by "pankajastro (via GitHub)" <gi...@apache.org>.
pankajastro commented on issue #28865:
URL: https://github.com/apache/airflow/issues/28865#issuecomment-1596598792

   It occurs when a task resumes and dag is paused. i.e. either if a task is in a deferrable state and from deferrable it resume again. or if we have more than 1 try and the task fails in that case it stuck in re-try.
   
   Few things we can do
   - shall we raise an error if dag is paused?
   - shall we unpaused the dag if dag is paused and then run the dag (We do this when we trigger dag from UI i.e manually)
   - we should keep some map for tis state and set the TI state to success if a state change from deferrable to schedule similar keep count for try num for each ti?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on issue #28865: dag.test() hangs with deferrable operators if DAG is paused

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on issue #28865:
URL: https://github.com/apache/airflow/issues/28865#issuecomment-1467612690

   The error in `_emit_true_scheduling_delay_stats_for_finished_state` is not related and has been fixed in main, for what it is worth.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] RNHTTR commented on issue #28865: dag.test() hangs with deferrable operators if DAG is paused

Posted by "RNHTTR (via GitHub)" <gi...@apache.org>.
RNHTTR commented on issue #28865:
URL: https://github.com/apache/airflow/issues/28865#issuecomment-1470577163

   > @RNHTTR Did you miss a `dag.test()` line in the reproducing DAG posted above? I can reproduce if I add it at the end of the file. Also it seems whether the DAG is paused or not is irrelevant to me, `dag.test()` always hangs.
   
   Yes, I think it ought to have `dag.test()` in an `if __name__ == "__main__"` block


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kamalesh0406 commented on issue #28865: dag.test() hangs with deferrable operators if DAG is paused

Posted by GitBox <gi...@apache.org>.
kamalesh0406 commented on issue #28865:
URL: https://github.com/apache/airflow/issues/28865#issuecomment-1381480923

   Hi, I am kind of unable to set up the dev env locally due to space issues. While I try to figure out a fix, it would be better for someone else to take up the task. I will try to take up another issue in the future once I have my setup fixed. Thanks! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on issue #28865: dag.test() hangs with deferrable operators if DAG is paused

Posted by GitBox <gi...@apache.org>.
uranusjr commented on issue #28865:
URL: https://github.com/apache/airflow/issues/28865#issuecomment-1379900888

   Go ahead.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on issue #28865: dag.test() hangs with deferrable operators if DAG is paused

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on issue #28865:
URL: https://github.com/apache/airflow/issues/28865#issuecomment-1467648834

   @RNHTTR Did you miss a `dag.test()` line in the reproducing DAG posted above?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on issue #28865: dag.test() hangs with deferrable operators if DAG is paused

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on issue #28865:
URL: https://github.com/apache/airflow/issues/28865#issuecomment-1592470799

   @pankajastro is looking into this issue


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kamalesh0406 commented on issue #28865: dag.test() hangs with deferrable operators if DAG is paused

Posted by GitBox <gi...@apache.org>.
kamalesh0406 commented on issue #28865:
URL: https://github.com/apache/airflow/issues/28865#issuecomment-1379895553

   @potiuk I can look into this issue, can you assign it to me?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] collinmcnulty commented on issue #28865: dag.test() hangs with deferrable operators if DAG is paused

Posted by "collinmcnulty (via GitHub)" <gi...@apache.org>.
collinmcnulty commented on issue #28865:
URL: https://github.com/apache/airflow/issues/28865#issuecomment-1468815077

   @uranusjr 
   
   ```
   import pendulum
   from airflow.decorators import dag, task
   
   from airflow.sensors.time_delta import TimeDeltaSensorAsync
   
   @dag(
            default_args={'retries': 2},
            schedule_interval=None,
            start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
            catchup=False,
            )
   def waiting_dag():
   
       a = TimeDeltaSensorAsync(task_id="waiting",
                                delta=pendulum.duration(seconds=10))
   
   
   waiting_dag()
   ```
   
   And then I ran the command `airflow tasks test waiting_dag waiting`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on issue #28865: dag.test() hangs with deferrable operators if DAG is paused

Posted by "pankajastro (via GitHub)" <gi...@apache.org>.
pankajastro commented on issue #28865:
URL: https://github.com/apache/airflow/issues/28865#issuecomment-1601113615

   > A somewhat more general question: Is it possible to run a run without unpausing the DAG at all?
   
   afaik In case of retry and deferable, it does not work. Currently, the test function loop which monitors dagrun submits TI's and assume that it would either fail or succeed. But does not care that it can again come to schedule from a deferred state. I have drafted a solution here https://github.com/apache/airflow/pull/32000 to handle these 2 cases. Please let me know how it is looking


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on issue #28865: dag.test() hangs with deferrable operators if DAG is paused

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on issue #28865:
URL: https://github.com/apache/airflow/issues/28865#issuecomment-1467643923

   @collinmcnulty Do you have a DAG to reproduce the issue? The DAG in the OP fnishes `airflow tasks test` as expected.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on issue #28865: dag.test() hangs with deferrable operators if DAG is paused

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on issue #28865:
URL: https://github.com/apache/airflow/issues/28865#issuecomment-1597536569

   A somewhat more general question: Is it possible to run a run without unpausing the DAG at all? Personally I think it would be very useful feature, enabling the workflow of writing a DAG (including a schedule), pause it (so it doesn’t run), test it, and only unpause when everything is validated. It would not only be useful for `test()`, but also trigger.
   
   But if this feature is too involved to implement, the first option (error out) seems to most reasonable.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] collinmcnulty commented on issue #28865: dag.test() hangs with deferrable operators if DAG is paused

Posted by "collinmcnulty (via GitHub)" <gi...@apache.org>.
collinmcnulty commented on issue #28865:
URL: https://github.com/apache/airflow/issues/28865#issuecomment-1464519371

   It appears that there is a similar failure, probably the same root cause, for `tasks test`.
   
   ```
   /usr/local/airflow$ airflow tasks test waiting_dag waiting
   [2023-03-10 21:53:10,054] {dagbag.py:538} INFO - Filling up the DagBag from /usr/local/airflow/dags
   /usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py:159 RemovedInAirflow3Warning: Calling `DAG.create_dagrun()` without an explicit data interval is deprecated
   [2023-03-10 21:53:10,125] {taskinstance.py:1083} INFO - Dependencies all met for <TaskInstance: waiting_dag.waiting __airflow_temporary_run_2023-03-10T21:53:10.094786+00:00__ [None]>
   [2023-03-10 21:53:10,132] {taskinstance.py:1083} INFO - Dependencies all met for <TaskInstance: waiting_dag.waiting __airflow_temporary_run_2023-03-10T21:53:10.094786+00:00__ [None]>
   [2023-03-10 21:53:10,132] {taskinstance.py:1279} INFO - 
   --------------------------------------------------------------------------------
   [2023-03-10 21:53:10,132] {taskinstance.py:1280} INFO - Starting attempt 1 of 3
   [2023-03-10 21:53:10,132] {taskinstance.py:1281} INFO - 
   --------------------------------------------------------------------------------
   [2023-03-10 21:53:10,133] {taskinstance.py:1300} INFO - Executing <Task(TimeDeltaSensorAsync): waiting> on 2023-03-10T21:53:10.094780+00:00
   [2023-03-10 21:53:10,150] {taskinstance.py:1507} INFO - Exporting the following env vars:
   AIRFLOW_CTX_DAG_OWNER=airflow
   AIRFLOW_CTX_DAG_ID=waiting_dag
   AIRFLOW_CTX_TASK_ID=waiting
   AIRFLOW_CTX_EXECUTION_DATE=2023-03-10T21:53:10.094780+00:00
   AIRFLOW_CTX_TRY_NUMBER=1
   AIRFLOW_CTX_DAG_RUN_ID=__airflow_temporary_run_2023-03-10T21:53:10.094786+00:00__
   [2023-03-10 21:53:10,153] {taskinstance.py:1382} INFO - Pausing task as DEFERRED. dag_id=waiting_dag, task_id=waiting, execution_date=20230310T215310, start_date=
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org