You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Dan Davydov (Jira)" <ji...@apache.org> on 2019/12/13 15:17:00 UTC

[jira] [Commented] (AIRFLOW-6184) Add `email_on_retry` and `email_on_failure` for DAGs

    [ https://issues.apache.org/jira/browse/AIRFLOW-6184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16995695#comment-16995695 ] 

Dan Davydov commented on AIRFLOW-6184:
--------------------------------------

This might not be necessary, using the existing on_failure/on_success callbacks might be satisfactory, e.g. we can add a utility function like below (credit to this ticket's reporter for writing it):
{code:java}
def email_on_failure_callback(emails, retries=3, wait_seconds=1):
  """Callback generator to send emails on DAG failure. Designed to be used with `on_failure_callback`.
  To avoid blocking the scheduler, exceptions are catched and logged. To see the logs access the
 Args:
      emails (list,str): list of emails to send the email to (comma or semicolon delimited)
      retries (int): Number of times to try to send the email.
      wait_seconds (int): Seconds to wait between retries.
Returns:
          Callback function to send email to specified email. Templated email includes important information and links to the instance.
  """
    @_catch_exception
    @retry(
    stop=stop_after_attempt(retries),
    wait=wait_fixed(wait_seconds),
    before=before_log(LOGGER, logging.DEBUG),
    after=after_log(LOGGER, logging.DEBUG)
    def notify_failure(context):
      LOGGER.info('Attempting to send email notification for failure to {}'.format(emails))
      iso = quote(context.get('execution_date').isoformat())
      base_url = context.get('conf').get('webserver', 'BASE_URL')    context.update({'dag_url': base_url + '/admin/airflow/graph?execution_date={iso}&dag_id={dag_id}'.format(
      iso=iso, dag_id=context.get('dag_run').dag_id
      )})
      html_content = """
      DAG {dag_run.dag_id} has failed<br>
      <br>
      Execution date: {execution_date}<br>
      First task instance log: <a href="{ti.log_url}">Log Link</a><br>
      <br>
      <a href="{dag_url}">Open failed DAGrun</a><br>
      """.format(**context)
    subject = 'Airflow alert: {dag_run.dag_id}'.format(**context)
    email.send_email(emails, subject, html_content)

 {code}
And then users can just do the following in a DAG constructor:
{code:java}
 with DAG('monitoring_example', on_failure_callback=email_on_failure_callback('test-email@gmail.com')) as dag: {code}
 

But https://issues.apache.org/jira/browse/AIRFLOW-6253 should be resolved first so that the callbacks don't affect scheduler performance

> Add `email_on_retry` and `email_on_failure` for DAGs
> ----------------------------------------------------
>
>                 Key: AIRFLOW-6184
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6184
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: DagRun, scheduler
>    Affects Versions: 1.10.7
>            Reporter: Gerard Casas Saez
>            Priority: Minor
>
> Currently, there's support for  `email_on_retry` and `email_on_failure` on operators.
> As a user I would like to have an easy way to be notified if the DAG is marked as failed. As I may want to get paged if so. 
>  
> Adding a notification operator may fail if the workers are not getting jobs scheduled.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)