You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/12/22 13:49:03 UTC

[GitHub] [airflow] andreychernih opened a new issue #20460: TimeDeltaSensorAsync task is failing occasionally

andreychernih opened a new issue #20460:
URL: https://github.com/apache/airflow/issues/20460


   ### Apache Airflow version
   
   2.2.2
   
   ### What happened
   
   I have a simple TimeDeltaSensorAsync task that is the very first task in the DAG:
   
   ```
   delay_sensor = TimeDeltaSensorAsync(task_id="wait", delta=timedelta(hours=6))
   
   # ...
   
   delay_sensor.set_downstream(download_task)
   download_task.set_downstream(process_task)
   ```
   
   The intention of this task is to simply wait 6 hours before proceeding with the rest of the DAG. I expect this task to always be either in the "waiting" or "success" states. In almost 50% of the cases across my DAG-s in production, this task is failing with "up_for_retry" state. I do have all my components up and running correctly: scheduler and triggerer. They are healthy and not restarting. This is a production system where all other DAG-s and tasks are working correctly.
   
   The only suspicious lines that I found in the log related to the task are:
   
   ```
   [2021-12-22 13:41:06,921] {base_executor.py:85} ERROR - could not queue task TaskInstanceKey(dag_id='integration-choice-advantage-810', task_id='wait', run_id='scheduled__2021-12-20T08:00:00+00:00', try_number=2)
   ...
   [2021-12-22 13:41:06,932] {scheduler_job.py:572} ERROR - Executor reports task instance <TaskInstance: integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 [queued]> finished (success) although the task says its queued. (Info: None) Was the task killed externally?
   [2021-12-22 13:41:06,936] {taskinstance.py:1705} ERROR - Executor reports task instance <TaskInstance: integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 [queued]> finished (success) although the task says its queued. (Info: None) Was the task killed externally?
   ```
   
   ![image](https://user-images.githubusercontent.com/131281/147102191-29b22e70-bda6-41c9-a7a9-7e0cacdd8de2.png)
   
   
   Scheduler log:
   
   ```
   [2021-12-22 13:41:06,913] {scheduler_job.py:288} INFO - 1 tasks up for execution:
   	<TaskInstance: integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 [scheduled]>
   [2021-12-22 13:41:06,916] {scheduler_job.py:317} INFO - Figuring out tasks to run in Pool(name=default_pool) with 1280 open slots and 1 task instances ready to be queued
   [2021-12-22 13:41:06,916] {scheduler_job.py:345} INFO - DAG integration-choice-advantage-810 has 0/2 running and queued tasks
   [2021-12-22 13:41:06,918] {scheduler_job.py:410} INFO - Setting the following tasks to queued state:
   	<TaskInstance: integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 [scheduled]>
   [2021-12-22 13:41:06,921] {scheduler_job.py:450} INFO - Sending TaskInstanceKey(dag_id='integration-choice-advantage-810', task_id='wait', run_id='scheduled__2021-12-20T08:00:00+00:00', try_number=2) to executor with priority 3 and queue default
   [2021-12-22 13:41:06,921] {base_executor.py:85} ERROR - could not queue task TaskInstanceKey(dag_id='integration-choice-advantage-810', task_id='wait', run_id='scheduled__2021-12-20T08:00:00+00:00', try_number=2)
   [2021-12-22 13:41:06,923] {base_executor.py:150} DEBUG - 1 running task instances
   [2021-12-22 13:41:06,923] {base_executor.py:151} DEBUG - 0 in queue
   [2021-12-22 13:41:06,923] {base_executor.py:152} DEBUG - 19 open slots
   [2021-12-22 13:41:06,923] {base_executor.py:161} DEBUG - Calling the <class 'airflow.executors.local_executor.LocalExecutor'> sync method
   [2021-12-22 13:41:06,924] {base_executor.py:198} DEBUG - Changing state: TaskInstanceKey(dag_id='integration-choice-advantage-810', task_id='wait', run_id='scheduled__2021-12-20T08:00:00+00:00', try_number=2)
   [2021-12-22 13:41:06,924] {scheduler_job.py:504} INFO - Executor reports execution of integration-choice-advantage-810.wait run_id=scheduled__2021-12-20T08:00:00+00:00 exited with status success for try_number 2
   [2021-12-22 13:41:06,932] {scheduler_job.py:547} INFO - TaskInstance Finished: dag_id=integration-choice-advantage-810, task_id=wait, run_id=scheduled__2021-12-20T08:00:00+00:00, run_start_date=2021-12-22 13:41:01.836808+00:00, run_end_date=None, run_duration=0.328748, state=queued, executor_state=success, try_number=2, max_tries=4, job_id=4517, pool=default_pool, queue=default, priority_weight=3, operator=TimeDeltaSensorAsync
   [2021-12-22 13:41:06,932] {scheduler_job.py:572} ERROR - Executor reports task instance <TaskInstance: integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 [queued]> finished (success) although the task says its queued. (Info: None) Was the task killed externally?
   [2021-12-22 13:41:06,936] {taskinstance.py:1705} ERROR - Executor reports task instance <TaskInstance: integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 [queued]> finished (success) although the task says its queued. (Info: None) Was the task killed externally?
   [2021-12-22 13:41:06,936] {taskinstance.py:720} DEBUG - Refreshing TaskInstance <TaskInstance: integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 [queued]> from DB
   [2021-12-22 13:41:06,943] {taskinstance.py:761} DEBUG - Refreshed TaskInstance <TaskInstance: integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 [queued]>
   [2021-12-22 13:41:06,943] {taskinstance.py:2224} DEBUG - Task Duration set to 5.106632
   [2021-12-22 13:41:06,943] {taskinstance.py:1286} DEBUG - Clearing next_method and next_kwargs.
   [2021-12-22 13:41:06,944] {taskinstance.py:1270} INFO - Marking task as UP_FOR_RETRY. dag_id=integration-choice-advantage-810, task_id=wait, execution_date=20211220T080000, start_date=20211222T134101, end_date=20211222T134106
   ```
   
   Triggerer log:
   
   ```
   [2021-12-22 13:41:02,998] {triggerer_job.py:356} INFO - Trigger <airflow.triggers.temporal.DateTimeTrigger moment=2021-12-21T14:00:00+00:00> (ID 499) starting
   [2021-12-22 13:41:02,998] {triggerer_job.py:359} INFO - Trigger <airflow.triggers.temporal.DateTimeTrigger moment=2021-12-21T14:00:00+00:00> (ID 499) fired: TriggerEvent
   [2021-12-22 13:41:04,003] {triggerer_job.py:356} INFO - Trigger <airflow.triggers.temporal.DateTimeTrigger moment=2021-12-21T14:00:00+00:00> (ID 499) starting
   [2021-12-22 13:41:04,003] {triggerer_job.py:359} INFO - Trigger <airflow.triggers.temporal.DateTimeTrigger moment=2021-12-21T14:00:00+00:00> (ID 499) fired: TriggerEvent
   ```
   
   Task instance log:
   
   ```
   [2021-12-22, 13:41:01 UTC] {base_task_runner.py:63} DEBUG - Planning to run as the  user
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:720} DEBUG - Refreshing TaskInstance <TaskInstance: integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 [queued]> from DB
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:761} DEBUG - Refreshed TaskInstance <TaskInstance: integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 [queued]>
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 [queued]> dependency 'Task Instance State' PASSED: True, Task state queued was valid.
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 [queued]> dependency 'Task Instance Not Running' PASSED: True, Task is not in running state.
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 [queued]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 [queued]> dependency 'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks.
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 [queued]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 [queued]>
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 [queued]> dependency 'Task Concurrency' PASSED: True, Task concurrency is not set.
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 [queued]> dependency 'Pool Slots Available' PASSED: True, ('There are enough open slots in %s to execute the task', 'default_pool')
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 [queued]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 [queued]> dependency 'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks.
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 [queued]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 [queued]>
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:1241} INFO - 
   --------------------------------------------------------------------------------
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:1242} INFO - Starting attempt 2 of 5
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:1243} INFO - 
   --------------------------------------------------------------------------------
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:1262} INFO - Executing <Task(TimeDeltaSensorAsync): wait> on 2021-12-20 08:00:00+00:00
   [2021-12-22, 13:41:01 UTC] {standard_task_runner.py:52} INFO - Started process 12810 to run task
   [2021-12-22, 13:41:01 UTC] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'integration-choice-advantage-810', 'wait', 'scheduled__2021-12-20T08:00:00+00:00', '--job-id', '4517', '--raw', '--subdir', 'DAGS_FOLDER/airflow-dag.py', '--cfg-path', '/tmp/tmpxq9fz1pw', '--error-file', '/tmp/tmpc0cq8o26']
   [2021-12-22, 13:41:01 UTC] {standard_task_runner.py:77} INFO - Job 4517: Subtask wait
   [2021-12-22, 13:41:01 UTC] {cli_action_loggers.py:66} DEBUG - Calling callbacks: [<function default_action_log at 0x7ff646e4b670>]
   [2021-12-22, 13:41:01 UTC] {settings.py:210} DEBUG - Setting up DB connection pool (PID 12810)
   [2021-12-22, 13:41:01 UTC] {settings.py:267} DEBUG - settings.prepare_engine_args(): Using NullPool
   [2021-12-22, 13:41:01 UTC] {logging_mixin.py:109} INFO - Running <TaskInstance: integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 [running]> on host airflow-scheduler-589549ff66-vnps5
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:720} DEBUG - Refreshing TaskInstance <TaskInstance: integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 [running]> from DB
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:761} DEBUG - Refreshed TaskInstance <TaskInstance: integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 [running]>
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:790} DEBUG - Clearing XCom data
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:797} DEBUG - XCom data cleared
   [2021-12-22, 13:41:02 UTC] {taskinstance.py:1427} INFO - Exporting the following env vars:
   AIRFLOW_CTX_DAG_OWNER=airflow
   AIRFLOW_CTX_DAG_ID=integration-choice-advantage-810
   AIRFLOW_CTX_TASK_ID=wait
   AIRFLOW_CTX_EXECUTION_DATE=2021-12-20T08:00:00+00:00
   AIRFLOW_CTX_DAG_RUN_ID=scheduled__2021-12-20T08:00:00+00:00
   [2021-12-22, 13:41:02 UTC] {__init__.py:146} DEBUG - Preparing lineage inlets and outlets
   [2021-12-22, 13:41:02 UTC] {__init__.py:190} DEBUG - inlets: [], outlets: []
   [2021-12-22, 13:41:02 UTC] {taskinstance.py:1340} INFO - Pausing task as DEFERRED. dag_id=integration-choice-advantage-810, task_id=wait, execution_date=20211220T080000, start_date=20211222T134101
   [2021-12-22, 13:41:02 UTC] {cli_action_loggers.py:84} DEBUG - Calling callbacks: []
   [2021-12-22, 13:41:02 UTC] {local_task_job.py:154} INFO - Task exited with return code 0
   [2021-12-22, 13:41:02 UTC] {taskinstance.py:720} DEBUG - Refreshing TaskInstance <TaskInstance: integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 [running]> from DB
   [2021-12-22, 13:41:02 UTC] {taskinstance.py:761} DEBUG - Refreshed TaskInstance <TaskInstance: integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 [deferred]>
   [2021-12-22, 13:41:02 UTC] {dagrun.py:619} DEBUG - number of tis tasks for <DagRun integration-choice-advantage-810 @ 2021-12-20 08:00:00+00:00: scheduled__2021-12-20T08:00:00+00:00, externally triggered: False>: 3 task(s)
   [2021-12-22, 13:41:02 UTC] {dagrun.py:634} DEBUG - number of scheduleable tasks for <DagRun integration-choice-advantage-810 @ 2021-12-20 08:00:00+00:00: scheduled__2021-12-20T08:00:00+00:00, externally triggered: False>: 2 task(s)
   [2021-12-22, 13:41:02 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: integration-choice-advantage-810.download_data scheduled__2021-12-20T08:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2021-12-22, 13:41:02 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: integration-choice-advantage-810.download_data scheduled__2021-12-20T08:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'wait'}
   [2021-12-22, 13:41:02 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: integration-choice-advantage-810.download_data scheduled__2021-12-20T08:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'wait'}
   [2021-12-22, 13:41:02 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: integration-choice-advantage-810.download_data scheduled__2021-12-20T08:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2021-12-22, 13:41:02 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: integration-choice-advantage-810.process scheduled__2021-12-20T08:00:00+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
   [2021-12-22, 13:41:02 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: integration-choice-advantage-810.process scheduled__2021-12-20T08:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'download_data'}
   [2021-12-22, 13:41:02 UTC] {taskinstance.py:1025} DEBUG - Dependencies not met for <TaskInstance: integration-choice-advantage-810.process scheduled__2021-12-20T08:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'download_data'}
   [2021-12-22, 13:41:02 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: integration-choice-advantage-810.process scheduled__2021-12-20T08:00:00+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
   [2021-12-22, 13:41:02 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```
   
   ### What you expected to happen
   
   TimeDeltaSensorAsync task should never fail
   
   ### How to reproduce
   
   _No response_
   
   ### Operating System
   
   Airflow Docker image
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] andrewgodwin commented on issue #20460: TimeDeltaSensorAsync task is failing occasionally

Posted by GitBox <gi...@apache.org>.
andrewgodwin commented on issue #20460:
URL: https://github.com/apache/airflow/issues/20460#issuecomment-1024498074


   Well, the first weird thing appears to be that the trigger is firing immediately, but I presume this is because TimeDeltaSensor is the delta from the `execution_date` rather than the actual current datetime, thus this must be for something in the past (and is a confusing property of timedeltasensor if you ask me).
   
   Past that, what's odd is that the trigger is getting called twice, which might indicate one of those race conditions that were getting looked at. Neither of those explain why the task's state is getting stuck, though.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Greetlist commented on issue #20460: TimeDeltaSensorAsync task is failing occasionally

Posted by GitBox <gi...@apache.org>.
Greetlist commented on issue #20460:
URL: https://github.com/apache/airflow/issues/20460#issuecomment-1023924535


   I have a similar problem in [#20308](https://github.com/apache/airflow/issues/20308), I fix it via a tricky way. I have a general solution in the issue, but I do not implement it. Hope it can help you. @andreychernih


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on issue #20460: TimeDeltaSensorAsync task is failing occasionally

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #20460:
URL: https://github.com/apache/airflow/issues/20460#issuecomment-1024302976


   @andrewgodwin - does it ring a bell?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org