You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Siddharth Anand (JIRA)" <ji...@apache.org> on 2016/05/12 03:03:12 UTC

[jira] [Updated] (AIRFLOW-106) Task Retries, on_failure callback, and email_on_failure Not Honored if First Task in a DAG Fails

     [ https://issues.apache.org/jira/browse/AIRFLOW-106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Siddharth Anand updated AIRFLOW-106:
------------------------------------
    Summary: Task Retries, on_failure callback, and email_on_failure Not Honored if First Task in a DAG Fails  (was: email_on_failure doesn't being triggered because dag FAILED before task_retries execute)

> Task Retries, on_failure callback, and email_on_failure Not Honored if First Task in a DAG Fails
> ------------------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-106
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-106
>             Project: Apache Airflow
>          Issue Type: Bug
>         Environment: Latest version from Git
>            Reporter: dud
>            Assignee: Siddharth Anand
>            Priority: Blocker
>
> Hello.
> I created the following workflow :
> {code}
> from airflow import DAG
> from airflow.operators import PythonOperator
> from datetime import datetime, timedelta
> from airflow.models import Variable
> from time import sleep
> default_args = {
>     'depends_on_past': False,
>     'start_date': datetime(2016, 5, 11, 15, 20),
>     'email': <my email>
>     'email_on_failure': True,
>     'email_on_retry': False,
>     'retries': 1,
>     'retry_delay': timedelta(minutes=2),
>     'end_date': datetime(2016, 5, 11, 16, 00),
> }
> PARENT_DAG_NAME = 'test'
> dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=timedelta(minutes=10))
> def sleep1_function(**kwargs):
>     sleep(90)
>     return Variable.get('test_var')
> sleep1 = PythonOperator(
>     task_id='sleep1',
>     python_callable=sleep1_function,
>     dag=dag)
> {code}
> I forgot to declare test_var so when this DAG launched it failed quickly. However no failure email was ever sent. Clearing the failed task to make it rerun doesn't trigger any email.
> Here is the logs :
> {code}
> [2016-05-11 15:53:31,784] {models.py:157} INFO - Filling up the DagBag from /var/lib/airflow/airflow/dags/test.py
> [2016-05-11 15:53:32,272] {models.py:157} INFO - Filling up the DagBag from /var/lib/airflow/airflow/dags/test.py
> [2016-05-11 15:53:32,313] {models.py:1216} INFO - 
> --------------------------------------------------------------------------------
> Starting attempt 1 of 2
> --------------------------------------------------------------------------------
> [2016-05-11 15:53:32,333] {models.py:1239} INFO - Executing <Task(PythonOperator): sleep1> on 2016-05-11 15:20:00
> [2016-05-11 15:55:03,450] {models.py:1306} ERROR - Variable test_var does not exist
> Traceback (most recent call last):
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py", line 1265, in run
>     result = task_copy.execute(context=context)
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/operators/python_operator.py", line 66, in execute
>     return_value = self.python_callable(*self.op_args, **self.op_kwargs)
>   File "/var/lib/airflow/airflow/dags/test.py", line 31, in sleep1_function
>     return Variable.get('test_var')
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/utils/db.py", line 53, in wrapper
>     result = func(*args, **kwargs)
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py", line 3145, in get
>     raise ValueError('Variable {} does not exist'.format(key))
> ValueError: Variable test_var does not exist
> [2016-05-11 15:55:03,581] {models.py:1318} INFO - Marking task as UP_FOR_RETRY
> [2016-05-11 15:55:03,759] {models.py:1347} ERROR - Variable test_var does not exist
> {code}
> In the DAG Runs page, the workflow is set as failed. In hte taks instance page, it is set as up_for_retry but no new run is ever scheduled.
> I tried incrementing the retires parameter, but nothing different happens, Airflow never retries after the first run.
> dud



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)