You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "dud (JIRA)" <ji...@apache.org> on 2016/05/11 17:24:13 UTC

[jira] [Updated] (AIRFLOW-106) email_on_failure doesn't being triggered

     [ https://issues.apache.org/jira/browse/AIRFLOW-106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

dud updated AIRFLOW-106:
------------------------
    Description: 
Hello.

I created the following workflow :

{code}
from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime, timedelta
from airflow.models import Variable
from time import sleep

default_args = {
    'depends_on_past': False,
    'start_date': datetime(2016, 5, 11, 15, 20),
    'email': <my email>
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=2),
    'end_date': datetime(2016, 5, 11, 16, 00),
}

PARENT_DAG_NAME = 'test'

dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=timedelta(minutes=10))

def sleep1_function(**kwargs):
    sleep(90)
    return Variable.get('test_var')

sleep1 = PythonOperator(
    task_id='sleep1',
    python_callable=sleep1_function,
    dag=dag)
{code}

I forgot to declare test_var so when this DAG launched it failed quickly. However no failure email was ever sent. Clearing the failed task to make it rerun doesn't trigger any email.

Here is the logs :
{code}
[2016-05-11 15:53:31,784] {models.py:157} INFO - Filling up the DagBag from /var/lib/airflow/airflow/dags/test.py
[2016-05-11 15:53:32,272] {models.py:157} INFO - Filling up the DagBag from /var/lib/airflow/airflow/dags/test.py
[2016-05-11 15:53:32,313] {models.py:1216} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 2
--------------------------------------------------------------------------------

[2016-05-11 15:53:32,333] {models.py:1239} INFO - Executing <Task(PythonOperator): sleep1> on 2016-05-11 15:20:00
[2016-05-11 15:55:03,450] {models.py:1306} ERROR - Variable test_var does not exist
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py", line 1265, in run
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/operators/python_operator.py", line 66, in execute
    return_value = self.python_callable(*self.op_args, **self.op_kwargs)
  File "/var/lib/airflow/airflow/dags/test.py", line 31, in sleep1_function
    return Variable.get('test_var')
  File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/utils/db.py", line 53, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py", line 3145, in get
    raise ValueError('Variable {} does not exist'.format(key))
ValueError: Variable test_var does not exist
[2016-05-11 15:55:03,581] {models.py:1318} INFO - Marking task as UP_FOR_RETRY
[2016-05-11 15:55:03,759] {models.py:1347} ERROR - Variable test_var does not exist
{code}

In the DAG Runs page, the workflow is set as failed. In hte taks instance page, it is set as up_for_retry but no new run is ever scheduled.

I tried incrementing the retires parameter, but nothing different happens, Airflow never retries after the first run.

dud

  was:
Hello.

I created the following workflow :

{code}
from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime, timedelta
from airflow.models import Variable
from time import sleep

default_args = {
    'depends_on_past': False,
    'start_date': datetime(2016, 5, 11, 15, 20),
    'email': <my email>
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=2),
    'end_date': datetime(2016, 5, 11, 16, 00),
}

PARENT_DAG_NAME = 'test'

dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=timedelta(minutes=10))

def sleep1_function(**kwargs):
    sleep(90)
    return Variable.get('test_var')

sleep1 = PythonOperator(
    task_id='sleep1',
    python_callable=sleep1_function,
    dag=dag)
{code}

I forgot to declare test_var so when this DAG launched it failed quickly. However no failure email was ever sent. Clearing the failed task to make it rerun doesn't trigger any email.

Here is the logs :
{code}
[2016-05-11 15:53:31,784] {models.py:157} INFO - Filling up the DagBag from /var/lib/airflow/airflow/dags/test.py
[2016-05-11 15:53:32,272] {models.py:157} INFO - Filling up the DagBag from /var/lib/airflow/airflow/dags/test.py
[2016-05-11 15:53:32,313] {models.py:1216} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 2
--------------------------------------------------------------------------------

[2016-05-11 15:53:32,333] {models.py:1239} INFO - Executing <Task(PythonOperator): sleep1> on 2016-05-11 15:20:00
[2016-05-11 15:55:03,450] {models.py:1306} ERROR - Variable test_var does not exist
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py", line 1265, in run
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/operators/python_operator.py", line 66, in execute
    return_value = self.python_callable(*self.op_args, **self.op_kwargs)
  File "/var/lib/airflow/airflow/dags/test.py", line 31, in sleep1_function
    return Variable.get('test_var')
  File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/utils/db.py", line 53, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py", line 3145, in get
    raise ValueError('Variable {} does not exist'.format(key))
ValueError: Variable test_var does not exist
[2016-05-11 15:55:03,581] {models.py:1318} INFO - Marking task as UP_FOR_RETRY
[2016-05-11 15:55:03,759] {models.py:1347} ERROR - Variable test_var does not exist
{code}

In the DAG Runs page, the workflow is set as failed. In hte taks instance page, it is set as up_for_retry but no new run is ever scheduled.

I tried incrementing the retires parameter, but nothing different happens, Airflow ever retries after the first run.

dud


> email_on_failure doesn't being triggered
> ----------------------------------------
>
>                 Key: AIRFLOW-106
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-106
>             Project: Apache Airflow
>          Issue Type: Bug
>         Environment: Latest version from Git
>            Reporter: dud
>
> Hello.
> I created the following workflow :
> {code}
> from airflow import DAG
> from airflow.operators import PythonOperator
> from datetime import datetime, timedelta
> from airflow.models import Variable
> from time import sleep
> default_args = {
>     'depends_on_past': False,
>     'start_date': datetime(2016, 5, 11, 15, 20),
>     'email': <my email>
>     'email_on_failure': True,
>     'email_on_retry': False,
>     'retries': 1,
>     'retry_delay': timedelta(minutes=2),
>     'end_date': datetime(2016, 5, 11, 16, 00),
> }
> PARENT_DAG_NAME = 'test'
> dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=timedelta(minutes=10))
> def sleep1_function(**kwargs):
>     sleep(90)
>     return Variable.get('test_var')
> sleep1 = PythonOperator(
>     task_id='sleep1',
>     python_callable=sleep1_function,
>     dag=dag)
> {code}
> I forgot to declare test_var so when this DAG launched it failed quickly. However no failure email was ever sent. Clearing the failed task to make it rerun doesn't trigger any email.
> Here is the logs :
> {code}
> [2016-05-11 15:53:31,784] {models.py:157} INFO - Filling up the DagBag from /var/lib/airflow/airflow/dags/test.py
> [2016-05-11 15:53:32,272] {models.py:157} INFO - Filling up the DagBag from /var/lib/airflow/airflow/dags/test.py
> [2016-05-11 15:53:32,313] {models.py:1216} INFO - 
> --------------------------------------------------------------------------------
> Starting attempt 1 of 2
> --------------------------------------------------------------------------------
> [2016-05-11 15:53:32,333] {models.py:1239} INFO - Executing <Task(PythonOperator): sleep1> on 2016-05-11 15:20:00
> [2016-05-11 15:55:03,450] {models.py:1306} ERROR - Variable test_var does not exist
> Traceback (most recent call last):
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py", line 1265, in run
>     result = task_copy.execute(context=context)
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/operators/python_operator.py", line 66, in execute
>     return_value = self.python_callable(*self.op_args, **self.op_kwargs)
>   File "/var/lib/airflow/airflow/dags/test.py", line 31, in sleep1_function
>     return Variable.get('test_var')
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/utils/db.py", line 53, in wrapper
>     result = func(*args, **kwargs)
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py", line 3145, in get
>     raise ValueError('Variable {} does not exist'.format(key))
> ValueError: Variable test_var does not exist
> [2016-05-11 15:55:03,581] {models.py:1318} INFO - Marking task as UP_FOR_RETRY
> [2016-05-11 15:55:03,759] {models.py:1347} ERROR - Variable test_var does not exist
> {code}
> In the DAG Runs page, the workflow is set as failed. In hte taks instance page, it is set as up_for_retry but no new run is ever scheduled.
> I tried incrementing the retires parameter, but nothing different happens, Airflow never retries after the first run.
> dud



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)