You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "hussein-awala (via GitHub)" <gi...@apache.org> on 2023/08/22 21:07:01 UTC

[GitHub] [airflow] hussein-awala commented on pull request #32990: Timeout Airflow TI and Sensors when using deferable mode

hussein-awala commented on PR #32990:
URL: https://github.com/apache/airflow/pull/32990#issuecomment-1688933450

   I think it's ready :tada: 
   
   ## How it works:
   
   When we defer a task, we call the method `_trigger_timeout` to calculate the moment when we should cancel the Trigger and the reason for this timeout; in the base operator we have 2 reasons:
   - execution_timeout: it's the same execution_timeout used in the  normal task, it's calculated by subtracting the time already spent since start_date from the total execution date, and adding this duration to the current moment.
   - trigger_timeout: it's the timeout provided to the method `defer`, and we calculate the timeout moment by adding the timeout duration to the current moment.
   
   We choose the nearest moment as timeout and its corresponding reason, then we provide them to  `TaskDeferred` exception, the the TI adds them to its row (I had to add a new column to simplify the implementation).
   
   When timeout moment is reached, we cancel the trigger and we update the next_method to `__timeout__`, then we resume the TI execution.
   
   When the task_runner or the `resume_execution` method detect that `next_method` is `__timeout__`, they call the method `handle_trigger_timeout`. This method call `on_kill` and raises `AirflowTaskTimeout` if the reason is execution_timeout, and call the method `on_defer_timeout` if it's trigger_timeout and returns its value (I added AirflowException for different reasons in case the user adds different reasons without handling them).
   
   By default the method `on_defer_timeout` raise `AirflowDeferTimeout` but the user can override it to:
   - raising different exception
   - returning a value (it will be parsed as a normal value return from execute method and it will be pushed to Xcom)
   - or doing something then re-deferring the task, and this could be very helpful when we want to defer for X seconds without implementing this explicitly in the trigger or failing the task. 
   
   ### What about base sensor?
   
   The base sensor overrides the methods `_trigger_timeout` to compare with its timeout, and adds the reason `sensor_timeout` if it's the nearest moment, and the method `handle_trigger_timeout` to handle the new reason by raising `AirflowSensorTimeout`. Once `AirflowSensorTimeout` is raised, the TI state is passed failed regardless the number of remaining attempts.
   
   ### What's next?
   
   Once this PR is merged, we can override the method `handle_trigger_timeout` in the operators which provide `timeout` to defer method as a workaround for this bug, by just raising `AirflowTaskTimeout` or `AirflowSensorTimeout`. This method will be used by Airflow 2.7.1 to make handling the timeout identical in sync and async modes, and it will be skipped by older version, so the workaround will work, but the task will raise `TaskDeferralError`, and the `on_kill` method will not be called, but that's sufficient for backward compatibility.
   
   closes: #32638
   closes: #32580
   closes: #19929
   
   I hope merging this before 2.7.1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org