You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@superset.apache.org by wi...@apache.org on 2021/02/24 21:32:12 UTC

[superset] branch master updated: feat(reports): send notification on error with grace (#13135)

This is an automated email from the ASF dual-hosted git repository.

willbarrett pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/superset.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b114fc  feat(reports): send notification on error with grace (#13135)
0b114fc is described below

commit 0b114fcbc58751c84b5e0c1f1a1024e6adf1bed7
Author: Daniel Vaz Gaspar <da...@gmail.com>
AuthorDate: Wed Feb 24 21:31:31 2021 +0000

    feat(reports): send notification on error with grace (#13135)
    
    * fix: add config to disable dataset ownership on the old api
    
    * fix CI docker build
    
    * fix logic
    
    * add deprecation comment on the config
    
    * feat: send alerts reports errors to recipients
    
    * update
    
    * feat(reports): send notification on error with grace
    
    * merge and revert config
    
    * fix lint and MySQL test
    
    * fix mysql tests
---
 superset/reports/commands/execute.py    |  69 ++++++++++++--
 superset/reports/commands/log_prune.py  |   4 +-
 superset/reports/dao.py                 |  38 ++++++++
 superset/reports/notifications/base.py  |   3 +-
 superset/reports/notifications/email.py |  43 +++++----
 superset/reports/notifications/slack.py |  47 +++++-----
 superset/utils/core.py                  |   2 +-
 tests/reports/commands_tests.py         | 153 +++++++++++++++++++++++++++++++-
 8 files changed, 309 insertions(+), 50 deletions(-)

diff --git a/superset/reports/commands/execute.py b/superset/reports/commands/execute.py
index ddf4f19..774f2c8 100644
--- a/superset/reports/commands/execute.py
+++ b/superset/reports/commands/execute.py
@@ -44,7 +44,10 @@ from superset.reports.commands.exceptions import (
     ReportScheduleUnexpectedError,
     ReportScheduleWorkingTimeoutError,
 )
-from superset.reports.dao import ReportScheduleDAO
+from superset.reports.dao import (
+    REPORT_SCHEDULE_ERROR_NOTIFICATION_MARKER,
+    ReportScheduleDAO,
+)
 from superset.reports.notifications import create_notification
 from superset.reports.notifications.base import NotificationContent, ScreenshotData
 from superset.reports.notifications.exceptions import NotificationError
@@ -147,6 +150,7 @@ class BaseReportState:
     def _get_screenshot(self) -> ScreenshotData:
         """
         Get a chart or dashboard screenshot
+
         :raises: ReportScheduleScreenshotFailedError
         """
         screenshot: Optional[BaseScreenshot] = None
@@ -170,6 +174,7 @@ class BaseReportState:
     def _get_notification_content(self) -> NotificationContent:
         """
         Gets a notification content, this is composed by a title and a screenshot
+
         :raises: ReportScheduleScreenshotFailedError
         """
         screenshot_data = self._get_screenshot()
@@ -185,14 +190,13 @@ class BaseReportState:
             )
         return NotificationContent(name=name, screenshot=screenshot_data)
 
-    def send(self) -> None:
+    def _send(self, notification_content: NotificationContent) -> None:
         """
-        Creates the notification content and sends them to all recipients
+        Sends a notification to all recipients
 
         :raises: ReportScheduleNotificationError
         """
         notification_errors = []
-        notification_content = self._get_notification_content()
         for recipient in self._report_schedule.recipients:
             notification = create_notification(recipient, notification_content)
             try:
@@ -203,6 +207,24 @@ class BaseReportState:
         if notification_errors:
             raise ReportScheduleNotificationError(";".join(notification_errors))
 
+    def send(self) -> None:
+        """
+        Creates the notification content and sends them to all recipients
+
+        :raises: ReportScheduleNotificationError
+        """
+        notification_content = self._get_notification_content()
+        self._send(notification_content)
+
+    def send_error(self, name: str, message: str) -> None:
+        """
+        Creates and sends a notification for an error, to all recipients
+
+        :raises: ReportScheduleNotificationError
+        """
+        notification_content = NotificationContent(name=name, text=message)
+        self._send(notification_content)
+
     def is_in_grace_period(self) -> bool:
         """
         Checks if an alert is on it's grace period
@@ -218,6 +240,23 @@ class BaseReportState:
             < last_success.end_dttm
         )
 
+    def is_in_error_grace_period(self) -> bool:
+        """
+        Checks if an alert/report on error is on it's notification grace period
+        """
+        last_success = ReportScheduleDAO.find_last_error_notification(
+            self._report_schedule, session=self._session
+        )
+        if not last_success:
+            return False
+        return (
+            last_success is not None
+            and self._report_schedule.grace_period
+            and datetime.utcnow()
+            - timedelta(seconds=self._report_schedule.grace_period)
+            < last_success.end_dttm
+        )
+
     def is_on_working_timeout(self) -> bool:
         """
         Checks if an alert is on a working timeout
@@ -256,9 +295,25 @@ class ReportNotTriggeredErrorState(BaseReportState):
                     return
             self.send()
             self.set_state_and_log(ReportState.SUCCESS)
-        except CommandException as ex:
-            self.set_state_and_log(ReportState.ERROR, error_message=str(ex))
-            raise ex
+        except CommandException as first_ex:
+            self.set_state_and_log(ReportState.ERROR, error_message=str(first_ex))
+            # TODO (dpgaspar) convert this logic to a new state eg: ERROR_ON_GRACE
+            if not self.is_in_error_grace_period():
+                try:
+                    self.send_error(
+                        f"Error occurred for {self._report_schedule.type}:"
+                        f" {self._report_schedule.name}",
+                        str(first_ex),
+                    )
+                    self.set_state_and_log(
+                        ReportState.ERROR,
+                        error_message=REPORT_SCHEDULE_ERROR_NOTIFICATION_MARKER,
+                    )
+                except CommandException as second_ex:
+                    self.set_state_and_log(
+                        ReportState.ERROR, error_message=str(second_ex)
+                    )
+            raise first_ex
 
 
 class ReportWorkingState(BaseReportState):
diff --git a/superset/reports/commands/log_prune.py b/superset/reports/commands/log_prune.py
index 4280d1b..4b577b1 100644
--- a/superset/reports/commands/log_prune.py
+++ b/superset/reports/commands/log_prune.py
@@ -50,9 +50,9 @@ class AsyncPruneReportScheduleLogCommand(BaseCommand):
                             report_schedule, from_date, session=session, commit=False
                         )
                         logger.info(
-                            "Deleted %s logs for %s",
+                            "Deleted %s logs for report schedule id: %s",
                             str(row_count),
-                            ReportSchedule.name,
+                            str(report_schedule.id),
                         )
                     except DAODeleteFailedError as ex:
                         prune_errors.append(str(ex))
diff --git a/superset/reports/dao.py b/superset/reports/dao.py
index 4e3e949..5ffa0f0 100644
--- a/superset/reports/dao.py
+++ b/superset/reports/dao.py
@@ -35,6 +35,9 @@ from superset.models.reports import (
 logger = logging.getLogger(__name__)
 
 
+REPORT_SCHEDULE_ERROR_NOTIFICATION_MARKER = "Notification sent with error"
+
+
 class ReportScheduleDAO(BaseDAO):
     model_cls = ReportSchedule
 
@@ -224,6 +227,41 @@ class ReportScheduleDAO(BaseDAO):
         )
 
     @staticmethod
+    def find_last_error_notification(
+        report_schedule: ReportSchedule, session: Optional[Session] = None,
+    ) -> Optional[ReportExecutionLog]:
+        """
+        Finds last error email sent
+        """
+        session = session or db.session
+        last_error_email_log = (
+            session.query(ReportExecutionLog)
+            .filter(
+                ReportExecutionLog.error_message
+                == REPORT_SCHEDULE_ERROR_NOTIFICATION_MARKER,
+                ReportExecutionLog.report_schedule == report_schedule,
+            )
+            .order_by(ReportExecutionLog.end_dttm.desc())
+            .first()
+        )
+        if not last_error_email_log:
+            return None
+        # Checks that only errors have occurred since the last email
+        report_from_last_email = (
+            session.query(ReportExecutionLog)
+            .filter(
+                ReportExecutionLog.state.notin_(
+                    [ReportState.ERROR, ReportState.WORKING]
+                ),
+                ReportExecutionLog.report_schedule == report_schedule,
+                ReportExecutionLog.end_dttm < last_error_email_log.end_dttm,
+            )
+            .order_by(ReportExecutionLog.end_dttm.desc())
+            .first()
+        )
+        return last_error_email_log if not report_from_last_email else None
+
+    @staticmethod
     def bulk_delete_logs(
         model: ReportSchedule,
         from_date: datetime,
diff --git a/superset/reports/notifications/base.py b/superset/reports/notifications/base.py
index f55154c..5fe7fe9 100644
--- a/superset/reports/notifications/base.py
+++ b/superset/reports/notifications/base.py
@@ -30,7 +30,8 @@ class ScreenshotData:
 @dataclass
 class NotificationContent:
     name: str
-    screenshot: ScreenshotData
+    screenshot: Optional[ScreenshotData] = None
+    text: Optional[str] = None
 
 
 class BaseNotification:  # pylint: disable=too-few-public-methods
diff --git a/superset/reports/notifications/email.py b/superset/reports/notifications/email.py
index e99a7f4..f2bd6e6 100644
--- a/superset/reports/notifications/email.py
+++ b/superset/reports/notifications/email.py
@@ -19,7 +19,7 @@ import json
 import logging
 from dataclasses import dataclass
 from email.utils import make_msgid, parseaddr
-from typing import Dict
+from typing import Dict, Optional
 
 from flask_babel import gettext as __
 
@@ -35,7 +35,7 @@ logger = logging.getLogger(__name__)
 @dataclass
 class EmailContent:
     body: str
-    images: Dict[str, bytes]
+    images: Optional[Dict[str, bytes]] = None
 
 
 class EmailNotification(BaseNotification):  # pylint: disable=too-few-public-methods
@@ -49,22 +49,35 @@ class EmailNotification(BaseNotification):  # pylint: disable=too-few-public-met
     def _get_smtp_domain() -> str:
         return parseaddr(app.config["SMTP_MAIL_FROM"])[1].split("@")[1]
 
-    def _get_content(self) -> EmailContent:
-        # Get the domain from the 'From' address ..
-        # and make a message id without the < > in the ends
-        domain = self._get_smtp_domain()
-        msgid = make_msgid(domain)[1:-1]
-
-        image = {msgid: self._content.screenshot.image}
-        body = __(
+    @staticmethod
+    def _error_template(text: str) -> str:
+        return __(
             """
-            <b><a href="%(url)s">Explore in Superset</a></b><p></p>
-            <img src="cid:%(msgid)s">
+            Error: %(text)s
             """,
-            url=self._content.screenshot.url,
-            msgid=msgid,
+            text=text,
         )
-        return EmailContent(body=body, images=image)
+
+    def _get_content(self) -> EmailContent:
+        if self._content.text:
+            return EmailContent(body=self._error_template(self._content.text))
+        # Get the domain from the 'From' address ..
+        # and make a message id without the < > in the end
+        if self._content.screenshot:
+            domain = self._get_smtp_domain()
+            msgid = make_msgid(domain)[1:-1]
+
+            image = {msgid: self._content.screenshot.image}
+            body = __(
+                """
+                <b><a href="%(url)s">Explore in Superset</a></b><p></p>
+                <img src="cid:%(msgid)s">
+                """,
+                url=self._content.screenshot.url,
+                msgid=msgid,
+            )
+            return EmailContent(body=body, images=image)
+        return EmailContent(body=self._error_template("Unexpected missing screenshot"))
 
     def _get_subject(self) -> str:
         return __(
diff --git a/superset/reports/notifications/slack.py b/superset/reports/notifications/slack.py
index 8e859ff..84f858c 100644
--- a/superset/reports/notifications/slack.py
+++ b/superset/reports/notifications/slack.py
@@ -18,13 +18,12 @@
 import json
 import logging
 from io import IOBase
-from typing import cast, Optional, Union
+from typing import Optional, Union
 
 from flask_babel import gettext as __
 from retry.api import retry
 from slack import WebClient
 from slack.errors import SlackApiError, SlackClientError
-from slack.web.slack_response import SlackResponse
 
 from superset import app
 from superset.models.reports import ReportRecipientType
@@ -44,46 +43,52 @@ class SlackNotification(BaseNotification):  # pylint: disable=too-few-public-met
     def _get_channel(self) -> str:
         return json.loads(self._recipient.recipient_config_json)["target"]
 
-    def _get_body(self) -> str:
+    @staticmethod
+    def _error_template(name: str, text: str) -> str:
         return __(
             """
             *%(name)s*\n
-            <%(url)s|Explore in Superset>
+            Error: %(text)s
             """,
-            name=self._content.name,
-            url=self._content.screenshot.url,
+            name=name,
+            text=text,
         )
 
+    def _get_body(self) -> str:
+        if self._content.text:
+            return self._error_template(self._content.name, self._content.text)
+        if self._content.screenshot:
+            return __(
+                """
+                *%(name)s*\n
+                <%(url)s|Explore in Superset>
+                """,
+                name=self._content.name,
+                url=self._content.screenshot.url,
+            )
+        return self._error_template(self._content.name, "Unexpected missing screenshot")
+
     def _get_inline_screenshot(self) -> Optional[Union[str, IOBase, bytes]]:
-        return self._content.screenshot.image
+        if self._content.screenshot:
+            return self._content.screenshot.image
+        return None
 
     @retry(SlackApiError, delay=10, backoff=2, tries=5)
     def send(self) -> None:
         file = self._get_inline_screenshot()
         channel = self._get_channel()
         body = self._get_body()
-
         try:
             client = WebClient(
                 token=app.config["SLACK_API_TOKEN"], proxy=app.config["SLACK_PROXY"]
             )
             # files_upload returns SlackResponse as we run it in sync mode.
             if file:
-                response = cast(
-                    SlackResponse,
-                    client.files_upload(
-                        channels=channel,
-                        file=file,
-                        initial_comment=body,
-                        title="subject",
-                    ),
+                client.files_upload(
+                    channels=channel, file=file, initial_comment=body, title="subject",
                 )
-                assert response["file"], str(response)  # the uploaded file
             else:
-                response = cast(
-                    SlackResponse, client.chat_postMessage(channel=channel, text=body),
-                )
-                assert response["message"]["text"], str(response)
+                client.chat_postMessage(channel=channel, text=body)
             logger.info("Report sent to slack")
         except SlackClientError as ex:
             raise NotificationError(ex)
diff --git a/superset/utils/core.py b/superset/utils/core.py
index 1b5ef70..4efad98 100644
--- a/superset/utils/core.py
+++ b/superset/utils/core.py
@@ -958,7 +958,7 @@ def send_mime_email(
             smtp.starttls()
         if smtp_user and smtp_password:
             smtp.login(smtp_user, smtp_password)
-        logger.info("Sent an email to %s", str(e_to))
+        logger.debug("Sent an email to %s", str(e_to))
         smtp.sendmail(e_from, e_to, mime_msg.as_string())
         smtp.quit()
     else:
diff --git a/tests/reports/commands_tests.py b/tests/reports/commands_tests.py
index e25b6ec..1de97eb 100644
--- a/tests/reports/commands_tests.py
+++ b/tests/reports/commands_tests.py
@@ -21,6 +21,7 @@ from unittest.mock import Mock, patch
 
 import pytest
 from contextlib2 import contextmanager
+from flask_sqlalchemy import BaseQuery
 from freezegun import freeze_time
 from sqlalchemy.sql import func
 
@@ -62,13 +63,34 @@ pytestmark = pytest.mark.usefixtures(
 )
 
 
-def get_target_from_report_schedule(report_schedule) -> List[str]:
+def get_target_from_report_schedule(report_schedule: ReportSchedule) -> List[str]:
     return [
         json.loads(recipient.recipient_config_json)["target"]
         for recipient in report_schedule.recipients
     ]
 
 
+def get_error_logs_query(report_schedule: ReportSchedule) -> BaseQuery:
+    return (
+        db.session.query(ReportExecutionLog)
+        .filter(
+            ReportExecutionLog.report_schedule == report_schedule,
+            ReportExecutionLog.state == ReportState.ERROR,
+        )
+        .order_by(ReportExecutionLog.end_dttm.desc())
+    )
+
+
+def get_notification_error_sent_count(report_schedule: ReportSchedule) -> int:
+    logs = get_error_logs_query(report_schedule).all()
+    notification_sent_logs = [
+        log.error_message
+        for log in logs
+        if log.error_message == "Notification sent with error"
+    ]
+    return len(notification_sent_logs)
+
+
 def assert_log(state: str, error_message: Optional[str] = None):
     db.session.commit()
     logs = db.session.query(ReportExecutionLog).all()
@@ -77,7 +99,11 @@ def assert_log(state: str, error_message: Optional[str] = None):
         assert logs[0].error_message == error_message
         assert logs[0].state == state
         return
-    assert len(logs) == 2
+    # On error we send an email
+    if state == ReportState.ERROR:
+        assert len(logs) == 3
+    else:
+        assert len(logs) == 2
     log_states = [log.state for log in logs]
     assert ReportState.WORKING in log_states
     assert state in log_states
@@ -94,6 +120,7 @@ def create_report_notification(
     report_type: Optional[str] = None,
     validator_type: Optional[str] = None,
     validator_config_json: Optional[str] = None,
+    grace_period: Optional[int] = None,
 ) -> ReportSchedule:
     report_type = report_type or ReportScheduleType.REPORT
     target = email_target or slack_channel
@@ -121,6 +148,7 @@ def create_report_notification(
         recipients=[recipient],
         validator_type=validator_type,
         validator_config_json=validator_config_json,
+        grace_period=grace_period,
     )
     return report_schedule
 
@@ -464,6 +492,7 @@ def create_invalid_sql_alert_email_chart(request):
                 validator_config_json=param_config[request.param][
                     "validator_config_json"
                 ],
+                grace_period=60 * 60,
             )
             yield report_schedule
 
@@ -766,7 +795,8 @@ def test_email_mul_alert(create_mul_alert_email_chart):
 
 
 @pytest.mark.usefixtures("create_invalid_sql_alert_email_chart")
-def test_invalid_sql_alert(create_invalid_sql_alert_email_chart):
+@patch("superset.reports.notifications.email.send_email_smtp")
+def test_invalid_sql_alert(email_mock, create_invalid_sql_alert_email_chart):
     """
     ExecuteReport Command: Test alert with invalid SQL statements
     """
@@ -775,3 +805,120 @@ def test_invalid_sql_alert(create_invalid_sql_alert_email_chart):
             AsyncExecuteReportScheduleCommand(
                 create_invalid_sql_alert_email_chart.id, datetime.utcnow()
             ).run()
+
+        notification_targets = get_target_from_report_schedule(
+            create_invalid_sql_alert_email_chart
+        )
+        # Assert the email smtp address, asserts a notification was sent with the error
+        assert email_mock.call_args[0][0] == notification_targets[0]
+
+
+@pytest.mark.usefixtures("create_invalid_sql_alert_email_chart")
+@patch("superset.reports.notifications.email.send_email_smtp")
+def test_grace_period_error(email_mock, create_invalid_sql_alert_email_chart):
+    """
+    ExecuteReport Command: Test alert grace period on error
+    """
+    with freeze_time("2020-01-01T00:00:00Z"):
+        with pytest.raises((AlertQueryError, AlertQueryInvalidTypeError)):
+            AsyncExecuteReportScheduleCommand(
+                create_invalid_sql_alert_email_chart.id, datetime.utcnow()
+            ).run()
+
+        # Only needed for MySQL, understand why
+        db.session.commit()
+        notification_targets = get_target_from_report_schedule(
+            create_invalid_sql_alert_email_chart
+        )
+        # Assert the email smtp address, asserts a notification was sent with the error
+        assert email_mock.call_args[0][0] == notification_targets[0]
+        assert (
+            get_notification_error_sent_count(create_invalid_sql_alert_email_chart) == 1
+        )
+
+    with freeze_time("2020-01-01T00:30:00Z"):
+        with pytest.raises((AlertQueryError, AlertQueryInvalidTypeError)):
+            AsyncExecuteReportScheduleCommand(
+                create_invalid_sql_alert_email_chart.id, datetime.utcnow()
+            ).run()
+        db.session.commit()
+        assert (
+            get_notification_error_sent_count(create_invalid_sql_alert_email_chart) == 1
+        )
+
+    # Grace period ends, assert a notification was sent
+    with freeze_time("2020-01-01T01:30:00Z"):
+        with pytest.raises((AlertQueryError, AlertQueryInvalidTypeError)):
+            AsyncExecuteReportScheduleCommand(
+                create_invalid_sql_alert_email_chart.id, datetime.utcnow()
+            ).run()
+        db.session.commit()
+        assert (
+            get_notification_error_sent_count(create_invalid_sql_alert_email_chart) == 2
+        )
+
+
+@pytest.mark.usefixtures("create_invalid_sql_alert_email_chart")
+@patch("superset.reports.notifications.email.send_email_smtp")
+@patch("superset.utils.screenshots.ChartScreenshot.compute_and_cache")
+def test_grace_period_error_flap(
+    screenshot_mock, email_mock, create_invalid_sql_alert_email_chart
+):
+    """
+    ExecuteReport Command: Test alert grace period on error
+    """
+    with freeze_time("2020-01-01T00:00:00Z"):
+        with pytest.raises((AlertQueryError, AlertQueryInvalidTypeError)):
+            AsyncExecuteReportScheduleCommand(
+                create_invalid_sql_alert_email_chart.id, datetime.utcnow()
+            ).run()
+        db.session.commit()
+        # Assert we have 1 notification sent on the log
+        assert (
+            get_notification_error_sent_count(create_invalid_sql_alert_email_chart) == 1
+        )
+
+    with freeze_time("2020-01-01T00:30:00Z"):
+        with pytest.raises((AlertQueryError, AlertQueryInvalidTypeError)):
+            AsyncExecuteReportScheduleCommand(
+                create_invalid_sql_alert_email_chart.id, datetime.utcnow()
+            ).run()
+        db.session.commit()
+        assert (
+            get_notification_error_sent_count(create_invalid_sql_alert_email_chart) == 1
+        )
+
+    # Change report_schedule to valid
+    create_invalid_sql_alert_email_chart.sql = "SELECT 1 AS metric"
+    create_invalid_sql_alert_email_chart.grace_period = 0
+    db.session.merge(create_invalid_sql_alert_email_chart)
+    db.session.commit()
+
+    with freeze_time("2020-01-01T00:31:00Z"):
+        # One success
+        AsyncExecuteReportScheduleCommand(
+            create_invalid_sql_alert_email_chart.id, datetime.utcnow()
+        ).run()
+        # Grace period ends
+        AsyncExecuteReportScheduleCommand(
+            create_invalid_sql_alert_email_chart.id, datetime.utcnow()
+        ).run()
+
+        db.session.commit()
+
+    create_invalid_sql_alert_email_chart.sql = "SELECT 'first'"
+    create_invalid_sql_alert_email_chart.grace_period = 10
+    db.session.merge(create_invalid_sql_alert_email_chart)
+    db.session.commit()
+
+    # assert that after a success, when back to error we send the error notification
+    # again
+    with freeze_time("2020-01-01T00:32:00Z"):
+        with pytest.raises((AlertQueryError, AlertQueryInvalidTypeError)):
+            AsyncExecuteReportScheduleCommand(
+                create_invalid_sql_alert_email_chart.id, datetime.utcnow()
+            ).run()
+        db.session.commit()
+        assert (
+            get_notification_error_sent_count(create_invalid_sql_alert_email_chart) == 2
+        )