You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Siddharth Anand (JIRA)" <ji...@apache.org> on 2016/05/12 02:54:12 UTC

[jira] [Comment Edited] (AIRFLOW-106) email_on_failure doesn't being triggered because dag FAILED before task_retries execute

    [ https://issues.apache.org/jira/browse/AIRFLOW-106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15281136#comment-15281136 ] 

Siddharth Anand edited comment on AIRFLOW-106 at 5/12/16 2:54 AM:
------------------------------------------------------------------

Thanks for reporting this. It's a serious bug and I have escalated it to blocker. I've assigned it to myself to better spread knowledge about scheduler changes around the committers. 

On Apr 4, the following commit was made to handle deadlocks.
https://github.com/apache/incubator-airflow/commit/2e0421a28347de9a24bb14f37d33988c50b901b2

Here's what is happening: 
* Your task is run in TaskInstance.run() in models.py
* When it fails, it calls self.handle_failure(e, test_mode, context) in the Exception
 block
* Since self. try_num==1 and task.retries==1, the task is placed in the UP_FOR_RETRY state
* The scheduler (in jobs.py), next runs process_dag(self, dag, queue)
* Since the task is UP_FOR_RETRY and since the retry interval has not yet passed, the task is added to a could_not_run set
* Because len(could_not_run) == len(descartes), the dag is incorrectly deemed to be in a deadlocked state and the DagRun is immediately FAILED

There are a few problems with this:
1. The retries are not honored
2. A DagRun is Failed, but there are TaskInstances left in a variety of states (e.g. UP_FOR_RETRY or even blank)
3. Because the TaskInstance is not allowed to fail till its retries expire, it never get to the point of being a failed TaskInstance -- email_on_failure is called on failed TaskInstances, not Failed DagRuns!

When does this bug not occur?
* If the failing TaskInstance is not the first TaskInstance in the DAG, this bug does not present itself. Though, I do need to verify if the DAGRun will be left in RUNNING state or correctly FAILED. 
* If the user disables retries, then this bug does not occur, since the very first time into the handle_failure() method, the email_on_error will be executed

cc [~jlowin]


was (Author: sanand):
Thanks for reporting this. It's a serious bug and I have escalated it to blocker. I've assigned it to myself to better spread knowledge about scheduler changes around the committers. 

On Apr 4, the following commit was made to handle deadlocks.
https://github.com/apache/incubator-airflow/commit/2e0421a28347de9a24bb14f37d33988c50b901b2

Here's what is happening: 
* Your task is run in TaskInstance.run() in models.py
* When it fails, it calls self.handle_failure(e, test_mode, context) in the Exception
 block
* Since self. try_num==1 and task.retries==1, the task is placed in the UP_FOR_RETRY state
* The scheduler (in jobs.py), next runs process_dag(self, dag, queue)
* Since the task is UP_FOR_RETRY and since the retry interval has not yet passed, the task is added to a could_not_run set
* Because len(could_not_run) == len(descartes), the dag is incorrectly deemed to be in a deadlocked state and the DagRun is immediately FAILED

There are a few problems with this:
1. The retries are not honored
2. A DagRun is Failed, but there are TaskInstances left in a variety of states (e.g. UP_FOR_RETRY or even blank)
3. Because the TaskInstance is not allowed to fail till its retries expire, it never get to the point of being a failed TaskInstance -- email_on_failure is called on failed TaskInstances, not Failed DagRuns!

The funny thing. If the first task in the DAG run succeeds, then the existing logic works fine. If it is the first task that fails, then we enter this problematic state,
cc [~jlowin]

> email_on_failure doesn't being triggered because dag FAILED before task_retries execute
> ---------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-106
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-106
>             Project: Apache Airflow
>          Issue Type: Bug
>         Environment: Latest version from Git
>            Reporter: dud
>            Assignee: Siddharth Anand
>            Priority: Blocker
>
> Hello.
> I created the following workflow :
> {code}
> from airflow import DAG
> from airflow.operators import PythonOperator
> from datetime import datetime, timedelta
> from airflow.models import Variable
> from time import sleep
> default_args = {
>     'depends_on_past': False,
>     'start_date': datetime(2016, 5, 11, 15, 20),
>     'email': <my email>
>     'email_on_failure': True,
>     'email_on_retry': False,
>     'retries': 1,
>     'retry_delay': timedelta(minutes=2),
>     'end_date': datetime(2016, 5, 11, 16, 00),
> }
> PARENT_DAG_NAME = 'test'
> dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=timedelta(minutes=10))
> def sleep1_function(**kwargs):
>     sleep(90)
>     return Variable.get('test_var')
> sleep1 = PythonOperator(
>     task_id='sleep1',
>     python_callable=sleep1_function,
>     dag=dag)
> {code}
> I forgot to declare test_var so when this DAG launched it failed quickly. However no failure email was ever sent. Clearing the failed task to make it rerun doesn't trigger any email.
> Here is the logs :
> {code}
> [2016-05-11 15:53:31,784] {models.py:157} INFO - Filling up the DagBag from /var/lib/airflow/airflow/dags/test.py
> [2016-05-11 15:53:32,272] {models.py:157} INFO - Filling up the DagBag from /var/lib/airflow/airflow/dags/test.py
> [2016-05-11 15:53:32,313] {models.py:1216} INFO - 
> --------------------------------------------------------------------------------
> Starting attempt 1 of 2
> --------------------------------------------------------------------------------
> [2016-05-11 15:53:32,333] {models.py:1239} INFO - Executing <Task(PythonOperator): sleep1> on 2016-05-11 15:20:00
> [2016-05-11 15:55:03,450] {models.py:1306} ERROR - Variable test_var does not exist
> Traceback (most recent call last):
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py", line 1265, in run
>     result = task_copy.execute(context=context)
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/operators/python_operator.py", line 66, in execute
>     return_value = self.python_callable(*self.op_args, **self.op_kwargs)
>   File "/var/lib/airflow/airflow/dags/test.py", line 31, in sleep1_function
>     return Variable.get('test_var')
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/utils/db.py", line 53, in wrapper
>     result = func(*args, **kwargs)
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py", line 3145, in get
>     raise ValueError('Variable {} does not exist'.format(key))
> ValueError: Variable test_var does not exist
> [2016-05-11 15:55:03,581] {models.py:1318} INFO - Marking task as UP_FOR_RETRY
> [2016-05-11 15:55:03,759] {models.py:1347} ERROR - Variable test_var does not exist
> {code}
> In the DAG Runs page, the workflow is set as failed. In hte taks instance page, it is set as up_for_retry but no new run is ever scheduled.
> I tried incrementing the retires parameter, but nothing different happens, Airflow never retries after the first run.
> dud



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)