You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/12/04 15:22:42 UTC

[airflow] branch master updated: Add SMTP timeout and retry limit for SMTP email backend. (#12801)

This is an automated email from the ASF dual-hosted git repository.

kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 88aa174  Add SMTP timeout and retry limit for SMTP email backend. (#12801)
88aa174 is described below

commit 88aa174047e673d0dbe2742b24210731d6a5378f
Author: Siddartha Ravichandran <sr...@expedia.com>
AuthorDate: Fri Dec 4 10:20:39 2020 -0500

    Add SMTP timeout and retry limit for SMTP email backend. (#12801)
---
 airflow/config_templates/config.yml          |  12 +++
 airflow/config_templates/default_airflow.cfg |   2 +
 airflow/config_templates/default_test.cfg    |   2 +
 airflow/utils/email.py                       |  35 ++++++--
 tests/utils/test_email.py                    | 127 ++++++++++++++++++++++-----
 5 files changed, 150 insertions(+), 28 deletions(-)

diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 8e65ef7..a70ddec 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1255,6 +1255,18 @@
       type: string
       example: ~
       default: "airflow@example.com"
+    - name: smtp_timeout
+      description: ~
+      version_added: ~
+      type: int
+      example: ~
+      default: "30"
+    - name: smtp_retry_limit
+      description: ~
+      version_added: ~
+      type: int
+      example: ~
+      default: "5"
 - name: sentry
   description: |
     Sentry (https://docs.sentry.io) integration. Here you can supply
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 1d670d4..3cf316b 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -611,6 +611,8 @@ smtp_ssl = False
 # smtp_password =
 smtp_port = 25
 smtp_mail_from = airflow@example.com
+smtp_timeout = 30
+smtp_retry_limit = 5
 
 [sentry]
 
diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg
index 824565d..767176d 100644
--- a/airflow/config_templates/default_test.cfg
+++ b/airflow/config_templates/default_test.cfg
@@ -87,6 +87,8 @@ smtp_user = airflow
 smtp_port = 25
 smtp_password = airflow
 smtp_mail_from = airflow@example.com
+smtp_retry_limit = 5
+smtp_timeout = 30
 
 [celery]
 celery_app_name = airflow.executors.celery_executor
diff --git a/airflow/utils/email.py b/airflow/utils/email.py
index 17d3b6e..8e4359b 100644
--- a/airflow/utils/email.py
+++ b/airflow/utils/email.py
@@ -168,6 +168,8 @@ def send_mime_email(e_from: str, e_to: List[str], mime_msg: MIMEMultipart, dryru
     smtp_port = conf.getint('smtp', 'SMTP_PORT')
     smtp_starttls = conf.getboolean('smtp', 'SMTP_STARTTLS')
     smtp_ssl = conf.getboolean('smtp', 'SMTP_SSL')
+    smtp_retry_limit = conf.getint('smtp', 'SMTP_RETRY_LIMIT')
+    smtp_timeout = conf.getint('smtp', 'SMTP_TIMEOUT')
     smtp_user = None
     smtp_password = None
 
@@ -178,14 +180,23 @@ def send_mime_email(e_from: str, e_to: List[str], mime_msg: MIMEMultipart, dryru
         log.debug("No user/password found for SMTP, so logging in with no authentication.")
 
     if not dryrun:
-        conn = smtplib.SMTP_SSL(smtp_host, smtp_port) if smtp_ssl else smtplib.SMTP(smtp_host, smtp_port)
-        if smtp_starttls:
-            conn.starttls()
-        if smtp_user and smtp_password:
-            conn.login(smtp_user, smtp_password)
-        log.info("Sent an alert email to %s", e_to)
-        conn.sendmail(e_from, e_to, mime_msg.as_string())
-        conn.quit()
+        for attempt in range(1, smtp_retry_limit + 1):
+            log.info("Email alerting: attempt %s", str(attempt))
+            try:
+                conn = _get_smtp_connection(smtp_host, smtp_port, smtp_timeout, smtp_ssl)
+            except smtplib.SMTPServerDisconnected:
+                if attempt < smtp_retry_limit:
+                    continue
+                raise
+
+            if smtp_starttls:
+                conn.starttls()
+            if smtp_user and smtp_password:
+                conn.login(smtp_user, smtp_password)
+            log.info("Sent an alert email to %s", e_to)
+            conn.sendmail(e_from, e_to, mime_msg.as_string())
+            conn.quit()
+            break
 
 
 def get_email_address_list(addresses: Union[str, Iterable[str]]) -> List[str]:
@@ -202,6 +213,14 @@ def get_email_address_list(addresses: Union[str, Iterable[str]]) -> List[str]:
     raise TypeError(f"Unexpected argument type: Received '{received_type}'.")
 
 
+def _get_smtp_connection(host: str, port: int, timeout: int, with_ssl: bool) -> smtplib.SMTP:
+    return (
+        smtplib.SMTP_SSL(host=host, port=port, timeout=timeout)
+        if with_ssl
+        else smtplib.SMTP(host=host, port=port, timeout=timeout)
+    )
+
+
 def _get_email_list_from_str(addresses: str) -> List[str]:
     delimiters = [",", ";"]
     for delimiter in delimiters:
diff --git a/tests/utils/test_email.py b/tests/utils/test_email.py
index 8081fc8..8966e97 100644
--- a/tests/utils/test_email.py
+++ b/tests/utils/test_email.py
@@ -21,6 +21,7 @@ import unittest
 from email.mime.application import MIMEApplication
 from email.mime.multipart import MIMEMultipart
 from email.mime.text import MIMEText
+from smtplib import SMTPServerDisconnected
 from unittest import mock
 
 from airflow import utils
@@ -118,7 +119,6 @@ class TestEmail(unittest.TestCase):
         self.assertEqual(msg['To'], ','.join(recipients))
 
 
-@conf_vars({('smtp', 'SMTP_SSL'): 'False'})
 class TestEmailSmtp(unittest.TestCase):
     @mock.patch('airflow.utils.email.send_mime_email')
     def test_send_smtp(self, mock_send_mime):
@@ -127,10 +127,10 @@ class TestEmailSmtp(unittest.TestCase):
         attachment.seek(0)
         utils.email.send_email_smtp('to', 'subject', 'content', files=[attachment.name])
         self.assertTrue(mock_send_mime.called)
-        call_args = mock_send_mime.call_args[0]
-        self.assertEqual(conf.get('smtp', 'SMTP_MAIL_FROM'), call_args[0])
-        self.assertEqual(['to'], call_args[1])
-        msg = call_args[2]
+        _, call_args = mock_send_mime.call_args
+        self.assertEqual(conf.get('smtp', 'SMTP_MAIL_FROM'), call_args['e_from'])
+        self.assertEqual(['to'], call_args['e_to'])
+        msg = call_args['mime_msg']
         self.assertEqual('subject', msg['Subject'])
         self.assertEqual(conf.get('smtp', 'SMTP_MAIL_FROM'), msg['From'])
         self.assertEqual(2, len(msg.get_payload()))
@@ -143,8 +143,8 @@ class TestEmailSmtp(unittest.TestCase):
     def test_send_smtp_with_multibyte_content(self, mock_send_mime):
         utils.email.send_email_smtp('to', 'subject', '🔥', mime_charset='utf-8')
         self.assertTrue(mock_send_mime.called)
-        call_args = mock_send_mime.call_args[0]
-        msg = call_args[2]
+        _, call_args = mock_send_mime.call_args
+        msg = call_args['mime_msg']
         mimetext = MIMEText('🔥', 'mixed', 'utf-8')
         self.assertEqual(mimetext.get_payload(), msg.get_payload()[0].get_payload())
 
@@ -155,10 +155,10 @@ class TestEmailSmtp(unittest.TestCase):
         attachment.seek(0)
         utils.email.send_email_smtp('to', 'subject', 'content', files=[attachment.name], cc='cc', bcc='bcc')
         self.assertTrue(mock_send_mime.called)
-        call_args = mock_send_mime.call_args[0]
-        self.assertEqual(conf.get('smtp', 'SMTP_MAIL_FROM'), call_args[0])
-        self.assertEqual(['to', 'cc', 'bcc'], call_args[1])
-        msg = call_args[2]
+        _, call_args = mock_send_mime.call_args
+        self.assertEqual(conf.get('smtp', 'SMTP_MAIL_FROM'), call_args['e_from'])
+        self.assertEqual(['to', 'cc', 'bcc'], call_args['e_to'])
+        msg = call_args['mime_msg']
         self.assertEqual('subject', msg['Subject'])
         self.assertEqual(conf.get('smtp', 'SMTP_MAIL_FROM'), msg['From'])
         self.assertEqual(2, len(msg.get_payload()))
@@ -173,13 +173,14 @@ class TestEmailSmtp(unittest.TestCase):
     @mock.patch('smtplib.SMTP')
     def test_send_mime(self, mock_smtp, mock_smtp_ssl):
         mock_smtp.return_value = mock.Mock()
-        mock_smtp_ssl.return_value = mock.Mock()
         msg = MIMEMultipart()
         utils.email.send_mime_email('from', 'to', msg, dryrun=False)
         mock_smtp.assert_called_once_with(
-            conf.get('smtp', 'SMTP_HOST'),
-            conf.getint('smtp', 'SMTP_PORT'),
+            host=conf.get('smtp', 'SMTP_HOST'),
+            port=conf.getint('smtp', 'SMTP_PORT'),
+            timeout=conf.getint('smtp', 'SMTP_TIMEOUT'),
         )
+        self.assertFalse(mock_smtp_ssl.called)
         self.assertTrue(mock_smtp.return_value.starttls.called)
         mock_smtp.return_value.login.assert_called_once_with(
             conf.get('smtp', 'SMTP_USER'),
@@ -191,21 +192,20 @@ class TestEmailSmtp(unittest.TestCase):
     @mock.patch('smtplib.SMTP_SSL')
     @mock.patch('smtplib.SMTP')
     def test_send_mime_ssl(self, mock_smtp, mock_smtp_ssl):
-        mock_smtp.return_value = mock.Mock()
         mock_smtp_ssl.return_value = mock.Mock()
         with conf_vars({('smtp', 'smtp_ssl'): 'True'}):
             utils.email.send_mime_email('from', 'to', MIMEMultipart(), dryrun=False)
         self.assertFalse(mock_smtp.called)
         mock_smtp_ssl.assert_called_once_with(
-            conf.get('smtp', 'SMTP_HOST'),
-            conf.getint('smtp', 'SMTP_PORT'),
+            host=conf.get('smtp', 'SMTP_HOST'),
+            port=conf.getint('smtp', 'SMTP_PORT'),
+            timeout=conf.getint('smtp', 'SMTP_TIMEOUT'),
         )
 
     @mock.patch('smtplib.SMTP_SSL')
     @mock.patch('smtplib.SMTP')
     def test_send_mime_noauth(self, mock_smtp, mock_smtp_ssl):
         mock_smtp.return_value = mock.Mock()
-        mock_smtp_ssl.return_value = mock.Mock()
         with conf_vars(
             {
                 ('smtp', 'smtp_user'): None,
@@ -215,8 +215,9 @@ class TestEmailSmtp(unittest.TestCase):
             utils.email.send_mime_email('from', 'to', MIMEMultipart(), dryrun=False)
         self.assertFalse(mock_smtp_ssl.called)
         mock_smtp.assert_called_once_with(
-            conf.get('smtp', 'SMTP_HOST'),
-            conf.getint('smtp', 'SMTP_PORT'),
+            host=conf.get('smtp', 'SMTP_HOST'),
+            port=conf.getint('smtp', 'SMTP_PORT'),
+            timeout=conf.getint('smtp', 'SMTP_TIMEOUT'),
         )
         self.assertFalse(mock_smtp.login.called)
 
@@ -226,3 +227,89 @@ class TestEmailSmtp(unittest.TestCase):
         utils.email.send_mime_email('from', 'to', MIMEMultipart(), dryrun=True)
         self.assertFalse(mock_smtp.called)
         self.assertFalse(mock_smtp_ssl.called)
+
+    @mock.patch('smtplib.SMTP_SSL')
+    @mock.patch('smtplib.SMTP')
+    def test_send_mime_complete_failure(self, mock_smtp: mock, mock_smtp_ssl):
+        mock_smtp.side_effect = SMTPServerDisconnected()
+        msg = MIMEMultipart()
+        with self.assertRaises(SMTPServerDisconnected):
+            utils.email.send_mime_email('from', 'to', msg, dryrun=False)
+
+        mock_smtp.assert_any_call(
+            host=conf.get('smtp', 'SMTP_HOST'),
+            port=conf.getint('smtp', 'SMTP_PORT'),
+            timeout=conf.getint('smtp', 'SMTP_TIMEOUT'),
+        )
+        self.assertEqual(mock_smtp.call_count, conf.getint('smtp', 'SMTP_RETRY_LIMIT'))
+        self.assertFalse(mock_smtp_ssl.called)
+        self.assertFalse(mock_smtp.return_value.starttls.called)
+        self.assertFalse(mock_smtp.return_value.login.called)
+        self.assertFalse(mock_smtp.return_value.sendmail.called)
+        self.assertFalse(mock_smtp.return_value.quit.called)
+
+    @mock.patch('smtplib.SMTP_SSL')
+    @mock.patch('smtplib.SMTP')
+    def test_send_mime_ssl_complete_failure(self, mock_smtp, mock_smtp_ssl):
+        mock_smtp_ssl.side_effect = SMTPServerDisconnected()
+        msg = MIMEMultipart()
+        with conf_vars({('smtp', 'smtp_ssl'): 'True'}):
+            with self.assertRaises(SMTPServerDisconnected):
+                utils.email.send_mime_email('from', 'to', msg, dryrun=False)
+
+        mock_smtp_ssl.assert_any_call(
+            host=conf.get('smtp', 'SMTP_HOST'),
+            port=conf.getint('smtp', 'SMTP_PORT'),
+            timeout=conf.getint('smtp', 'SMTP_TIMEOUT'),
+        )
+        self.assertEqual(mock_smtp_ssl.call_count, conf.getint('smtp', 'SMTP_RETRY_LIMIT'))
+        self.assertFalse(mock_smtp.called)
+        self.assertFalse(mock_smtp_ssl.return_value.starttls.called)
+        self.assertFalse(mock_smtp_ssl.return_value.login.called)
+        self.assertFalse(mock_smtp_ssl.return_value.sendmail.called)
+        self.assertFalse(mock_smtp_ssl.return_value.quit.called)
+
+    @mock.patch('smtplib.SMTP_SSL')
+    @mock.patch('smtplib.SMTP')
+    def test_send_mime_custom_timeout_retrylimit(self, mock_smtp, mock_smtp_ssl):
+        mock_smtp.side_effect = SMTPServerDisconnected()
+        msg = MIMEMultipart()
+
+        custom_retry_limit = 10
+        custom_timeout = 60
+
+        with conf_vars(
+            {
+                ('smtp', 'smtp_retry_limit'): str(custom_retry_limit),
+                ('smtp', 'smtp_timeout'): str(custom_timeout),
+            }
+        ):
+            with self.assertRaises(SMTPServerDisconnected):
+                utils.email.send_mime_email('from', 'to', msg, dryrun=False)
+
+        mock_smtp.assert_any_call(
+            host=conf.get('smtp', 'SMTP_HOST'), port=conf.getint('smtp', 'SMTP_PORT'), timeout=custom_timeout
+        )
+        self.assertFalse(mock_smtp_ssl.called)
+        self.assertEqual(mock_smtp.call_count, 10)
+
+    @mock.patch('smtplib.SMTP_SSL')
+    @mock.patch('smtplib.SMTP')
+    def test_send_mime_partial_failure(self, mock_smtp, mock_smtp_ssl):
+        final_mock = mock.Mock()
+        side_effects = [SMTPServerDisconnected(), SMTPServerDisconnected(), final_mock]
+        mock_smtp.side_effect = side_effects
+        msg = MIMEMultipart()
+
+        utils.email.send_mime_email('from', 'to', msg, dryrun=False)
+
+        mock_smtp.assert_any_call(
+            host=conf.get('smtp', 'SMTP_HOST'),
+            port=conf.getint('smtp', 'SMTP_PORT'),
+            timeout=conf.getint('smtp', 'SMTP_TIMEOUT'),
+        )
+        self.assertEqual(mock_smtp.call_count, side_effects.index(final_mock) + 1)
+        self.assertFalse(mock_smtp_ssl.called)
+        self.assertTrue(final_mock.starttls.called)
+        final_mock.sendmail.assert_called_once_with('from', 'to', msg.as_string())
+        self.assertTrue(final_mock.quit.called)