You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/09/28 19:04:08 UTC

[GitHub] [airflow] vgupta3 opened a new issue, #26760: DAG on_failure_callback uses wrong context

vgupta3 opened a new issue, #26760:
URL: https://github.com/apache/airflow/issues/26760

   ### Apache Airflow version
   
   2.4.0
   
   ### What happened
   
   When a task fails in a DAG, the on_failure_callback [registered while creating the dag](https://github.com/apache/airflow/blob/21c40cd4cc7830bc17a4180d8dd0fa2cc4ed9cea/airflow/models/dag.py#L387) is triggered using the context of a [random task instance](https://github.com/apache/airflow/blob/21c40cd4cc7830bc17a4180d8dd0fa2cc4ed9cea/airflow/models/dag.py#L1296).
   
   ### What you think should happen instead
   
   The expectation is that one of the task instances that caused the dag failure should be used instead of a random task instance.
   
   
   ### How to reproduce
   
   Run the below dag.
   
   ```python
   import datetime
   from airflow.models.dag import DAG
   from airflow.operators.python import PythonOperator
   from airflow.operators.empty import EmptyOperator
   
   def all_bad():
       raise Exception("I have failed")
   
   def all_good():
       print("ALL GOOD")
   
   def failure_callback_dag(context):
       print("Inside failure_callback_dag")
       print(context["task_instance"])
       print(context["task"])
   
   with DAG(
           dag_id = "test_dag",
           schedule_interval=None,
           start_date=datetime.datetime(2021, 1, 1),
           catchup=False,
           on_failure_callback=failure_callback_dag
       ) as dag:
   
       start = EmptyOperator(
           task_id="start"
       )
       
       fail = PythonOperator(
           provide_context = True,
           task_id = "fail",
           python_callable = all_bad
       )
   
       passs = PythonOperator(
           provide_context = True,
           task_id = "pass",
           python_callable = all_good
       )
   
       start >> [passs, fail]
   
   ```
   
   From the dag processor logs:
   
   The context is from the task instance that has succeeded.
   ```
   [2022-09-28T18:28:14.465+0000] {logging_mixin.py:117} INFO - [2022-09-28T18:28:14.463+0000] {dag.py:1292} INFO - Executing dag callback function: <function failure_callback_dag at 0x7fd17ca18560>
   [2022-09-28T18:28:14.943+0000] {logging_mixin.py:117} INFO - Inside failure_callback_dag
   [2022-09-28T18:28:14.944+0000] {logging_mixin.py:117} INFO - <TaskInstance: test_dag.pass manual__2022-09-28T18:27:59.612118+00:00 [success]>
   [2022-09-28T18:28:14.944+0000] {logging_mixin.py:117} INFO - <Task(PythonOperator): pass>
   ```
   
   ### Operating System
   
   Debian GNU/Linux 10 (buster)
   
   ### Versions of Apache Airflow Providers
   
   Default providers that are present in the official airflow docker image.
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   Not sure if it is an expected behaviour, incase it is it needs to be documented in [Callbacks](https://airflow.apache.org/docs/apache-airflow/2.4.0/logging-monitoring/callbacks.html).
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] DAG on_failure_callback uses wrong context [airflow]

Posted by "ttzhou (via GitHub)" <gi...@apache.org>.
ttzhou commented on issue #26760:
URL: https://github.com/apache/airflow/issues/26760#issuecomment-2028793383

   Chiming in: what also would be helpful (IMO) in the current web docs is what context variables are available when the callback is called for the various situations, e.g
   
   DAG level, failure
   DAG level, success
   Task level, failure
   Task level, success
   DAG level, SLA miss
   
   I can dig through the code to find this eventually, but I think it would be helpful to include explicitly on the callback page, or at least a link to where it can be found.
   
   If this makes sense, I could try and take a stab at a doc PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] cklimkowski commented on issue #26760: DAG on_failure_callback uses wrong context

Posted by "cklimkowski (via GitHub)" <gi...@apache.org>.
cklimkowski commented on issue #26760:
URL: https://github.com/apache/airflow/issues/26760#issuecomment-1494844164

   Hi All!<br/>
   Hi @josh-fell,
   If the same callback is applied to all tasks, I think, it would be triggered by each task failure instead  of being executed only once at the DAG failure. Please share your thoughts.
   
   > If you want to gain the context from the failed task, you can set [`on_failure_callback` at the task level](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html?#airflow.models.baseoperator.BaseOperator) or, I presume you'd like the same callback to apply to all tasks, within [`default_args`](https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html?highlight=default_args#default-arguments).
   
   It would be nice to be capable of getting the earliest failed task instance from the context or even all the task instances that failed.
   
   Please advise.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] josh-fell commented on issue #26760: DAG on_failure_callback uses wrong context

Posted by "josh-fell (via GitHub)" <gi...@apache.org>.
josh-fell commented on issue #26760:
URL: https://github.com/apache/airflow/issues/26760#issuecomment-1413808038

   This has been coming up more frequently for users. I can take on related improvements here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] josh-fell commented on issue #26760: DAG on_failure_callback uses wrong context

Posted by GitBox <gi...@apache.org>.
josh-fell commented on issue #26760:
URL: https://github.com/apache/airflow/issues/26760#issuecomment-1261388181

   Yeah, looking at the Callbacks documentation, I definitely agree it's misleading and could be improved.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on issue #26760: DAG on_failure_callback uses wrong context

Posted by GitBox <gi...@apache.org>.
eladkal commented on issue #26760:
URL: https://github.com/apache/airflow/issues/26760#issuecomment-1264058174

   > Yeah, looking at the Callbacks documentation, I definitely agree it's misleading and could be improved.
   
   Actually both options are mentioned:
   https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/callbacks.html


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on issue #26760: DAG on_failure_callback uses wrong context

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #26760:
URL: https://github.com/apache/airflow/issues/26760#issuecomment-1261347402

   Thanks for opening your first issue here! Be sure to follow the issue template!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] commented on issue #26760: DAG on_failure_callback uses wrong context

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on issue #26760:
URL: https://github.com/apache/airflow/issues/26760#issuecomment-1533613907

   This issue has been automatically marked as stale because it has been open for 30 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] josh-fell commented on issue #26760: DAG on_failure_callback uses wrong context

Posted by GitBox <gi...@apache.org>.
josh-fell commented on issue #26760:
URL: https://github.com/apache/airflow/issues/26760#issuecomment-1261376667

   Hey @vgupta3! Have you tried applying your `on_failure_callback` to `default_args` instead? Setting [`on_failure_callback` at the DAG level](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/dag/index.html#airflow.models.dag.DAG) applies to the DagRun failing not tasks specifically.
   
   > on_failure_callback (DagStateChangeCallback | None) – A function to be called when a DagRun of this dag fails. A context dictionary is passed as a single parameter to this function.
   
   If you want to gain the context from the failed task, you can set [`on_failure_callback` at the task level](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html?#airflow.models.baseoperator.BaseOperator) or, I presume you'd like the same callback to apply to all tasks, within `default_args`.
   
   > on_failure_callback (TaskStateChangeCallback | None) – a function to be called when a task instance of this task fails. a context dictionary is passed as a single parameter to this function. Context contains references to related objects to the task instance and is documented under the macros section of the API.
   
   And, of course, if the documentation isn't clear enough you are absolutely welcomed and encouraged to open a PR to clarify concepts!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] josh-fell commented on issue #26760: DAG on_failure_callback uses wrong context

Posted by "josh-fell (via GitHub)" <gi...@apache.org>.
josh-fell commented on issue #26760:
URL: https://github.com/apache/airflow/issues/26760#issuecomment-1468100784

   @viniciusdsmello Where are you setting your `on_failure_callback`: on the DAG object, `default_args`, or on tasks themselves?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on issue #26760: DAG on_failure_callback uses wrong context

Posted by GitBox <gi...@apache.org>.
uranusjr commented on issue #26760:
URL: https://github.com/apache/airflow/issues/26760#issuecomment-1261661345

   Perhaps we should rename the DAG-level argument to `on_dag_failure_callback` instead?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] gabrielboehme commented on issue #26760: DAG on_failure_callback uses wrong context

Posted by "gabrielboehme (via GitHub)" <gi...@apache.org>.
gabrielboehme commented on issue #26760:
URL: https://github.com/apache/airflow/issues/26760#issuecomment-1675096321

   @viniciusdsmello I have encountered the same problem as you. I have the following dag which sends a Slack notification. In my case, task_1 throws an error, but the given context to the callback function is for task_2.
   
   `def slack_alert(context):
       slack_msg = \
           f"""
               :x: Task Failed.
   ]            *Task*: {context.get('task_instance').task_id}
               *Dag*: {context.get('task_instance').dag_id}
               *Execution Time*: {context.get('execution_date')}
               <{context.get('task_instance').log_url}|*Logs*>
           """
       slk = SlackHelper()
       slk.send_message_to_channel(
           channel="#random_channel",
           text=slack_msg
       )`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] viniciusdsmello commented on issue #26760: DAG on_failure_callback uses wrong context

Posted by "viniciusdsmello (via GitHub)" <gi...@apache.org>.
viniciusdsmello commented on issue #26760:
URL: https://github.com/apache/airflow/issues/26760#issuecomment-1465276221

   Hello everyone. Could you let me know if there are any updates on this issue?
   
   I had the same issue with the following DAG
   <img width="494" alt="image" src="https://user-images.githubusercontent.com/6565443/224566895-4e5c77b0-16d1-4d04-a94f-151e0d13b145.png">
   
   I'm creating an on_failure_callback to send events to Pagerduty. In one of my tests, I noticed that the context contains information from the first task.
   
   Here is the callback function
   <img width="650" alt="image" src="https://user-images.githubusercontent.com/6565443/224567061-83737709-b297-4501-9012-6d990846c75a.png">
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] josh-fell commented on issue #26760: DAG on_failure_callback uses wrong context

Posted by GitBox <gi...@apache.org>.
josh-fell commented on issue #26760:
URL: https://github.com/apache/airflow/issues/26760#issuecomment-1264153224

   > Actually both options are mentioned: https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/callbacks.html
   
   Callbacks are set at both the DAG and task level in the code snippets, but the copy in the doc only mentions _tasks_. Even the callback callable of `on_failure_callback=task_failure_alert` set at the DAG is named in a misleading way. The callback will be triggered when the _DagRun_ fails. Which is similar to a task failing but the context provided to the callable wouldn't necessarily be related to the task that failed. IIRC the context passed to the DAG-level callback is either the first task's context or just whatever is returned from the metadatabase first. IMO the doc can use a glow-up to help with these distinctions.
   
   > Perhaps we should rename the DAG-level argument to on_dag_failure_callback instead?
   
   +1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] will-byrne-cardano commented on issue #26760: DAG on_failure_callback uses wrong context

Posted by "will-byrne-cardano (via GitHub)" <gi...@apache.org>.
will-byrne-cardano commented on issue #26760:
URL: https://github.com/apache/airflow/issues/26760#issuecomment-1673495862

   > Hi All! Hi @josh-fell, If the same callback is applied to all tasks, I think, it would be triggered by each task failure instead of being executed only once at the DAG failure. Please share your thoughts.
   > 
   > > If you want to gain the context from the failed task, you can set [`on_failure_callback` at the task level](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html?#airflow.models.baseoperator.BaseOperator) or, I presume you'd like the same callback to apply to all tasks, within [`default_args`](https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html?highlight=default_args#default-arguments).
   > 
   > It would be nice to be capable of getting the earliest failed task instance from the context or even all the task instances that failed.
   > 
   > Please advise.
   
   Hi @cklimkowski 
   
   One possible workaround you could use that I have employed in some of my dags to find "the earliest failed task instance from the context or even all the task instances that failed" is to use the following pattern when designing callback functions:
   
   ```python
   def failure_callback(context):
       dag_run = context.get("dag_run")
       tis = dag_run.get_task_instances()
       failed_instances = [ti for ti in tis if ti.state == "failed"]
       failed_instance_ids [ti.task_id for ti in failed_instances]
      ...  # add further logic e.g. take first element for first failed task 
   ```
   
   This should always get at least the first failed task instance. However, if a failed task triggers a dag failure and there are still other tasks running, and those tasks end up failing, it won't pick these up as their state will be `running` when the callback fires. A potential workaround (that I've not tested) if you need all failed is to implement a retry with a backoff on your callback that waits until no tasks have the state `running`. This could lead to complications with undead tasks though, so be sure to set some limit on number of retries if you are to do this. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dgbarmac commented on issue #26760: DAG on_failure_callback uses wrong context

Posted by "dgbarmac (via GitHub)" <gi...@apache.org>.
dgbarmac commented on issue #26760:
URL: https://github.com/apache/airflow/issues/26760#issuecomment-1626037839

   Hi,
   
   I think I have a related problem. I'm using Airflow v2.5.1 and testing using the following code:
   
   ```
   from airflow import DAG
   from airflow.operators.bash_operator import BashOperator
   from datetime import datetime, timedelta
   from utils import UtilsFunctions
   
   default_args = {
       'owner': 'owner',
       'email_on_failure': False,
       'email_on_retry': False,
       'email': ['airflow@airflow.com'],
   }
   
   def failure_callback_dag(context):
       print("Inside failure_callback_dag")
       print(context["task_instance"])
       print(context["task"])
   
   with DAG(
       dag_id='sns_test',
       default_args=default_args,
       start_date=datetime(2023, 1, 1),
       schedule_interval=None,
       on_success_callback=None,
       on_failure_callback=failure_callback_dag,
       catchup=False,
   ) as dag:
       task = BashOperator(
           task_id="bash-error",
           bash_command=f"error",
       )
   
       task
   ```
   Checking the logs I can't see the prints that I made. I think that the callback is not even being called. Using the default_args works but I would like to have a callback called only once.
   
   I can't see what I am doing wrong.
   
   Do you have any ideias?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org