You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Dan Davydov (Jira)" <ji...@apache.org> on 2019/12/13 15:17:00 UTC
[jira] [Commented] (AIRFLOW-6184) Add `email_on_retry` and
`email_on_failure` for DAGs
[ https://issues.apache.org/jira/browse/AIRFLOW-6184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16995695#comment-16995695 ]
Dan Davydov commented on AIRFLOW-6184:
--------------------------------------
This might not be necessary, using the existing on_failure/on_success callbacks might be satisfactory, e.g. we can add a utility function like below (credit to this ticket's reporter for writing it):
{code:java}
def email_on_failure_callback(emails, retries=3, wait_seconds=1):
"""Callback generator to send emails on DAG failure. Designed to be used with `on_failure_callback`.
To avoid blocking the scheduler, exceptions are catched and logged. To see the logs access the
Args:
emails (list,str): list of emails to send the email to (comma or semicolon delimited)
retries (int): Number of times to try to send the email.
wait_seconds (int): Seconds to wait between retries.
Returns:
Callback function to send email to specified email. Templated email includes important information and links to the instance.
"""
@_catch_exception
@retry(
stop=stop_after_attempt(retries),
wait=wait_fixed(wait_seconds),
before=before_log(LOGGER, logging.DEBUG),
after=after_log(LOGGER, logging.DEBUG)
def notify_failure(context):
LOGGER.info('Attempting to send email notification for failure to {}'.format(emails))
iso = quote(context.get('execution_date').isoformat())
base_url = context.get('conf').get('webserver', 'BASE_URL') context.update({'dag_url': base_url + '/admin/airflow/graph?execution_date={iso}&dag_id={dag_id}'.format(
iso=iso, dag_id=context.get('dag_run').dag_id
)})
html_content = """
DAG {dag_run.dag_id} has failed<br>
<br>
Execution date: {execution_date}<br>
First task instance log: <a href="{ti.log_url}">Log Link</a><br>
<br>
<a href="{dag_url}">Open failed DAGrun</a><br>
""".format(**context)
subject = 'Airflow alert: {dag_run.dag_id}'.format(**context)
email.send_email(emails, subject, html_content)
{code}
And then users can just do the following in a DAG constructor:
{code:java}
with DAG('monitoring_example', on_failure_callback=email_on_failure_callback('test-email@gmail.com')) as dag: {code}
But https://issues.apache.org/jira/browse/AIRFLOW-6253 should be resolved first so that the callbacks don't affect scheduler performance
> Add `email_on_retry` and `email_on_failure` for DAGs
> ----------------------------------------------------
>
> Key: AIRFLOW-6184
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6184
> Project: Apache Airflow
> Issue Type: Improvement
> Components: DagRun, scheduler
> Affects Versions: 1.10.7
> Reporter: Gerard Casas Saez
> Priority: Minor
>
> Currently, there's support for `email_on_retry` and `email_on_failure` on operators.
> As a user I would like to have an easy way to be notified if the DAG is marked as failed. As I may want to get paged if so.
>
> Adding a notification operator may fail if the workers are not getting jobs scheduled.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)