You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@superset.apache.org by ma...@apache.org on 2018/12/11 06:29:38 UTC
[incubator-superset] branch master updated: [SIP-3] Scheduled email
reports for Slices / Dashboards (#5294)
This is an automated email from the ASF dual-hosted git repository.
maximebeauchemin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-superset.git
The following commit(s) were added to refs/heads/master by this push:
new 8086224 [SIP-3] Scheduled email reports for Slices / Dashboards (#5294)
8086224 is described below
commit 808622414c0f901bbecc53678fca0085af41a04c
Author: Mahendra M <ma...@affirm.com>
AuthorDate: Mon Dec 10 22:29:29 2018 -0800
[SIP-3] Scheduled email reports for Slices / Dashboards (#5294)
* [scheduled reports] Add support for scheduled reports
* Scheduled email reports for slice and dashboard visualization
(attachment or inline)
* Scheduled email reports for slice data (CSV attachment on inline table)
* Each schedule has a list of recipients (all of them can receive a single mail,
or separate mails)
* All outgoing mails can have a mandatory bcc - for audit purposes.
* Each dashboard/slice can have multiple schedules.
In addition, this PR also makes a few minor improvements to the celery
infrastructure.
* Create a common celery app
* Added more celery annotations for the tasks
* Introduced celery beat
* Update docs about concurrency / pools
* [scheduled reports] - Debug mode for scheduled emails
* [scheduled reports] - Ability to send test mails
* [scheduled reports] - Test email functionality - minor improvements
* [scheduled reports] - Rebase with master. Minor fixes
* [scheduled reports] - Add warning messages
* [scheduled reports] - flake8
* [scheduled reports] - fix rebase
* [scheduled reports] - fix rebase
* [scheduled reports] - fix flake8
* [scheduled reports] Rebase in prep for merge
* Fixed alembic tree after rebase
* Updated requirements to latest version of packages (and tested)
* Removed py2 stuff
* [scheduled reports] - fix flake8
* [scheduled reports] - address review comments
* [scheduled reports] - rebase with master
---
.gitignore | 9 +
docs/faq.rst | 2 +-
docs/installation.rst | 116 +++++-
requirements.txt | 3 +
setup.py | 3 +
superset/config.py | 91 ++++-
.../6c7537a6004a_models_for_email_reports.py | 69 ++++
superset/models/__init__.py | 1 +
superset/models/schedules.py | 93 +++++
superset/sql_lab.py | 9 +-
superset/tasks/__init__.py | 2 +
superset/tasks/celery_app.py | 11 +
superset/tasks/schedules.py | 441 +++++++++++++++++++++
.../templates/superset/reports/slice_data.html | 31 ++
superset/utils/core.py | 33 +-
superset/views/__init__.py | 1 +
superset/views/core.py | 1 -
superset/views/schedules.py | 279 +++++++++++++
tests/email_tests.py | 35 ++
tests/fixtures/sample.png | Bin 0 -> 4481 bytes
tests/fixtures/trends.csv | 3 +
tests/schedules_test.py | 368 +++++++++++++++++
tests/utils.py | 8 +-
23 files changed, 1569 insertions(+), 40 deletions(-)
diff --git a/.gitignore b/.gitignore
index c6cc07b..2ace393 100644
--- a/.gitignore
+++ b/.gitignore
@@ -44,3 +44,12 @@ yarn-error.log
*.iml
venv
@eaDir/
+
+# Test data
+celery_results.sqlite
+celerybeat-schedule
+celerydb.sqlite
+celerybeat.pid
+geckodriver.log
+ghostdriver.log
+testCSV.csv
diff --git a/docs/faq.rst b/docs/faq.rst
index 3b69044..9deba3c 100644
--- a/docs/faq.rst
+++ b/docs/faq.rst
@@ -88,7 +88,7 @@ It's easy: use the ``Filter Box`` widget, build a slice, and add it to your
dashboard.
The ``Filter Box`` widget allows you to define a query to populate dropdowns
-that can be use for filtering. To build the list of distinct values, we
+that can be used for filtering. To build the list of distinct values, we
run a query, and sort the result by the metric you provide, sorting
descending.
diff --git a/docs/installation.rst b/docs/installation.rst
index 7224cd2..fd6efec 100644
--- a/docs/installation.rst
+++ b/docs/installation.rst
@@ -603,14 +603,12 @@ Upgrading should be as straightforward as running::
superset db upgrade
superset init
-SQL Lab
--------
-SQL Lab is a powerful SQL IDE that works with all SQLAlchemy compatible
-databases. By default, queries are executed in the scope of a web
-request so they
-may eventually timeout as queries exceed the maximum duration of a web
-request in your environment, whether it'd be a reverse proxy or the Superset
-server itself.
+Celery Tasks
+------------
+On large analytic databases, it's common to run background jobs, reports
+and/or queries that execute for minutes or hours. In certain cases, we need
+to support long running tasks that execute beyond the typical web request's
+timeout (30-60 seconds).
On large analytic databases, it's common to run queries that
execute for minutes or hours.
@@ -634,15 +632,41 @@ have the same configuration.
class CeleryConfig(object):
BROKER_URL = 'redis://localhost:6379/0'
- CELERY_IMPORTS = ('superset.sql_lab', )
+ CELERY_IMPORTS = (
+ 'superset.sql_lab',
+ 'superset.tasks',
+ )
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
- CELERY_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}}
+ CELERYD_LOG_LEVEL = 'DEBUG'
+ CELERYD_PREFETCH_MULTIPLIER = 10
+ CELERY_ACKS_LATE = True
+ CELERY_ANNOTATIONS = {
+ 'sql_lab.get_sql_results': {
+ 'rate_limit': '100/s',
+ },
+ 'email_reports.send': {
+ 'rate_limit': '1/s',
+ 'time_limit': 120,
+ 'soft_time_limit': 150,
+ 'ignore_result': True,
+ },
+ }
+ CELERYBEAT_SCHEDULE = {
+ 'email_reports.schedule_hourly': {
+ 'task': 'email_reports.schedule_hourly',
+ 'schedule': crontab(minute=1, hour='*'),
+ },
+ }
CELERY_CONFIG = CeleryConfig
-To start a Celery worker to leverage the configuration run: ::
+* To start a Celery worker to leverage the configuration run: ::
+
+ celery worker --app=superset.tasks.celery_app:app --pool=prefork -Ofair -c 4
- celery worker --app=superset.sql_lab:celery_app --pool=gevent -Ofair
+* To start a job which schedules periodic background jobs, run ::
+
+ celery beat --app=superset.tasks.celery_app:app
To setup a result backend, you need to pass an instance of a derivative
of ``werkzeug.contrib.cache.BaseCache`` to the ``RESULTS_BACKEND``
@@ -665,11 +689,65 @@ look something like:
RESULTS_BACKEND = RedisCache(
host='localhost', port=6379, key_prefix='superset_results')
-Note that it's important that all the worker nodes and web servers in
-the Superset cluster share a common metadata database.
-This means that SQLite will not work in this context since it has
-limited support for concurrency and
-typically lives on the local file system.
+**Important notes**
+
+* It is important that all the worker nodes and web servers in
+ the Superset cluster share a common metadata database.
+ This means that SQLite will not work in this context since it has
+ limited support for concurrency and
+ typically lives on the local file system.
+
+* There should only be one instance of ``celery beat`` running in your
+ entire setup. If not, background jobs can get scheduled multiple times
+ resulting in weird behaviors like duplicate delivery of reports,
+ higher than expected load / traffic etc.
+
+
+Email Reports
+-------------
+Email reports allow users to schedule email reports for
+
+* slice and dashboard visualization (Attachment or inline)
+* slice data (CSV attachment on inline table)
+
+Schedules are defined in crontab format and each schedule
+can have a list of recipients (all of them can receive a single mail,
+or separate mails). For audit purposes, all outgoing mails can have a
+mandatory bcc.
+
+**Requirements**
+
+* A selenium compatible driver & headless browser
+
+ * `geckodriver <https://github.com/mozilla/geckodriver>`_ and Firefox is preferred
+ * `chromedriver <http://chromedriver.chromium.org/>`_ is a good option too
+* Run `celery worker` and `celery beat` as follows ::
+
+ celery worker --app=superset.tasks.celery_app:app --pool=prefork -Ofair -c 4
+ celery beat --app=superset.tasks.celery_app:app
+
+**Important notes**
+
+* Be mindful of the concurrency setting for celery (using ``-c 4``).
+ Selenium/webdriver instances can consume a lot of CPU / memory on your servers.
+
+* In some cases, if you notice a lot of leaked ``geckodriver`` processes, try running
+ your celery processes with ::
+
+ celery worker --pool=prefork --max-tasks-per-child=128 ...
+
+* It is recommended to run separate workers for ``sql_lab`` and
+ ``email_reports`` tasks. Can be done by using ``queue`` field in ``CELERY_ANNOTATIONS``
+
+SQL Lab
+-------
+SQL Lab is a powerful SQL IDE that works with all SQLAlchemy compatible
+databases. By default, queries are executed in the scope of a web
+request so they may eventually timeout as queries exceed the maximum duration of a web
+request in your environment, whether it'd be a reverse proxy or the Superset
+server itself. In such cases, it is preferred to use ``celery`` to run the queries
+in the background. Please follow the examples/notes mentioned above to get your
+celery setup working.
Also note that SQL Lab supports Jinja templating in queries and that it's
possible to overload
@@ -684,6 +762,8 @@ in this dictionary are made available for users to use in their SQL.
}
+Celery Flower
+-------------
Flower is a web based tool for monitoring the Celery cluster which you can
install from pip: ::
@@ -691,7 +771,7 @@ install from pip: ::
and run via: ::
- celery flower --app=superset.sql_lab:celery_app
+ celery flower --app=superset.tasks.celery_app:app
Building from source
---------------------
diff --git a/requirements.txt b/requirements.txt
index 081ca70..fedd2b1 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -20,6 +20,7 @@ chardet==3.0.4 # via requests
click==6.7
colorama==0.3.9
contextlib2==0.5.5
+croniter==0.3.26
cryptography==1.9
defusedxml==0.5.0 # via python3-openid
docutils==0.14 # via botocore
@@ -69,9 +70,11 @@ python3-openid==3.1.0 # via flask-openid
pytz==2018.5 # via babel, celery, flower, pandas
pyyaml==3.13
requests==2.18.4
+retry==0.9.2
rfc3986==1.1.0 # via tableschema
s3transfer==0.1.13 # via boto3
sasl==0.2.1 # via thrift-sasl
+selenium==3.141.0
simplejson==3.15.0
six==1.11.0 # via bleach, cryptography, isodate, jsonlines, linear-tsv, pathlib2, polyline, pydruid, python-dateutil, sasl, sqlalchemy-utils, tableschema, tabulator, thrift
sqlalchemy-utils==0.32.21
diff --git a/setup.py b/setup.py
index 542ca3f..1b55314 100644
--- a/setup.py
+++ b/setup.py
@@ -94,6 +94,9 @@ setup(
'thrift-sasl>=0.2.1',
'unicodecsv',
'unidecode>=0.04.21',
+ 'croniter==0.3.25',
+ 'selenium==3.14.0',
+ 'retry==0.9.2',
],
extras_require={
'cors': ['flask-cors>=2.0.0'],
diff --git a/superset/config.py b/superset/config.py
index 1613e75..37afdd5 100644
--- a/superset/config.py
+++ b/superset/config.py
@@ -11,6 +11,7 @@ import json
import os
import sys
+from celery.schedules import crontab
from dateutil import tz
from flask_appbuilder.security.manager import AUTH_DB
@@ -309,19 +310,43 @@ WARNING_MSG = None
# Default celery config is to use SQLA as a broker, in a production setting
# you'll want to use a proper broker as specified here:
# http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html
-"""
-# Example:
+
+
class CeleryConfig(object):
- BROKER_URL = 'sqla+sqlite:///celerydb.sqlite'
- CELERY_IMPORTS = ('superset.sql_lab', )
- CELERY_RESULT_BACKEND = 'db+sqlite:///celery_results.sqlite'
- CELERY_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}}
- CELERYD_LOG_LEVEL = 'DEBUG'
- CELERYD_PREFETCH_MULTIPLIER = 1
- CELERY_ACKS_LATE = True
+ BROKER_URL = 'sqla+sqlite:///celerydb.sqlite'
+ CELERY_IMPORTS = (
+ 'superset.sql_lab',
+ 'superset.tasks',
+ )
+ CELERY_RESULT_BACKEND = 'db+sqlite:///celery_results.sqlite'
+ CELERYD_LOG_LEVEL = 'DEBUG'
+ CELERYD_PREFETCH_MULTIPLIER = 1
+ CELERY_ACKS_LATE = True
+ CELERY_ANNOTATIONS = {
+ 'sql_lab.get_sql_results': {
+ 'rate_limit': '100/s',
+ },
+ 'email_reports.send': {
+ 'rate_limit': '1/s',
+ 'time_limit': 120,
+ 'soft_time_limit': 150,
+ 'ignore_result': True,
+ },
+ }
+ CELERYBEAT_SCHEDULE = {
+ 'email_reports.schedule_hourly': {
+ 'task': 'email_reports.schedule_hourly',
+ 'schedule': crontab(minute=1, hour='*'),
+ },
+ }
+
+
CELERY_CONFIG = CeleryConfig
+
"""
+# Set celery config to None to disable all the above configuration
CELERY_CONFIG = None
+"""
# static http headers to be served by your Superset server.
# This header prevents iFrames from other domains and
@@ -463,6 +488,54 @@ SQL_QUERY_MUTATOR = None
# using flask-compress
ENABLE_FLASK_COMPRESS = True
+# Enable / disable scheduled email reports
+ENABLE_SCHEDULED_EMAIL_REPORTS = False
+
+# If enabled, certail features are run in debug mode
+# Current list:
+# * Emails are sent using dry-run mode (logging only)
+SCHEDULED_EMAIL_DEBUG_MODE = False
+
+# Email reports - minimum time resolution (in minutes) for the crontab
+EMAIL_REPORTS_CRON_RESOLUTION = 15
+
+# Email report configuration
+# From address in emails
+EMAIL_REPORT_FROM_ADDRESS = 'reports@superset.org'
+
+# Send bcc of all reports to this address. Set to None to disable.
+# This is useful for maintaining an audit trail of all email deliveries.
+EMAIL_REPORT_BCC_ADDRESS = None
+
+# User credentials to use for generating reports
+# This user should have permissions to browse all the dashboards and
+# slices.
+# TODO: In the future, login as the owner of the item to generate reports
+EMAIL_REPORTS_USER = 'admin'
+EMAIL_REPORTS_SUBJECT_PREFIX = '[Report] '
+
+# The webdriver to use for generating reports. Use one of the following
+# firefox
+# Requires: geckodriver and firefox installations
+# Limitations: can be buggy at times
+# chrome:
+# Requires: headless chrome
+# Limitations: unable to generate screenshots of elements
+EMAIL_REPORTS_WEBDRIVER = 'firefox'
+
+# Window size - this will impact the rendering of the data
+WEBDRIVER_WINDOW = {
+ 'dashboard': (1600, 2000),
+ 'slice': (3000, 1200),
+}
+
+# Any config options to be passed as-is to the webdriver
+WEBDRIVER_CONFIGURATION = {}
+
+# The base URL to query for accessing the user interface
+WEBDRIVER_BASEURL = 'http://0.0.0.0:8080/'
+
+
try:
if CONFIG_PATH_ENV_VAR in os.environ:
# Explicitly import config module that is not in pythonpath; useful
diff --git a/superset/migrations/versions/6c7537a6004a_models_for_email_reports.py b/superset/migrations/versions/6c7537a6004a_models_for_email_reports.py
new file mode 100644
index 0000000..fdfbf8c
--- /dev/null
+++ b/superset/migrations/versions/6c7537a6004a_models_for_email_reports.py
@@ -0,0 +1,69 @@
+"""models for email reports
+
+Revision ID: 6c7537a6004a
+Revises: e502db2af7be
+Create Date: 2018-05-15 20:28:51.977572
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '6c7537a6004a'
+down_revision = 'a61b40f9f57f'
+
+from alembic import op
+import sqlalchemy as sa
+
+
+def upgrade():
+ # ### commands auto generated by Alembic - please adjust! ###
+ op.create_table('dashboard_email_schedules',
+ sa.Column('created_on', sa.DateTime(), nullable=True),
+ sa.Column('changed_on', sa.DateTime(), nullable=True),
+ sa.Column('id', sa.Integer(), nullable=False),
+ sa.Column('active', sa.Boolean(), nullable=True),
+ sa.Column('crontab', sa.String(length=50), nullable=True),
+ sa.Column('recipients', sa.Text(), nullable=True),
+ sa.Column('deliver_as_group', sa.Boolean(), nullable=True),
+ sa.Column('delivery_type', sa.Enum('attachment', 'inline', name='emaildeliverytype'), nullable=True),
+ sa.Column('dashboard_id', sa.Integer(), nullable=True),
+ sa.Column('created_by_fk', sa.Integer(), nullable=True),
+ sa.Column('changed_by_fk', sa.Integer(), nullable=True),
+ sa.Column('user_id', sa.Integer(), nullable=True),
+ sa.ForeignKeyConstraint(['changed_by_fk'], ['ab_user.id'], ),
+ sa.ForeignKeyConstraint(['created_by_fk'], ['ab_user.id'], ),
+ sa.ForeignKeyConstraint(['dashboard_id'], ['dashboards.id'], ),
+ sa.ForeignKeyConstraint(['user_id'], ['ab_user.id'], ),
+ sa.PrimaryKeyConstraint('id')
+ )
+ op.create_index(op.f('ix_dashboard_email_schedules_active'), 'dashboard_email_schedules', ['active'], unique=False)
+ op.create_table('slice_email_schedules',
+ sa.Column('created_on', sa.DateTime(), nullable=True),
+ sa.Column('changed_on', sa.DateTime(), nullable=True),
+ sa.Column('id', sa.Integer(), nullable=False),
+ sa.Column('active', sa.Boolean(), nullable=True),
+ sa.Column('crontab', sa.String(length=50), nullable=True),
+ sa.Column('recipients', sa.Text(), nullable=True),
+ sa.Column('deliver_as_group', sa.Boolean(), nullable=True),
+ sa.Column('delivery_type', sa.Enum('attachment', 'inline', name='emaildeliverytype'), nullable=True),
+ sa.Column('slice_id', sa.Integer(), nullable=True),
+ sa.Column('email_format', sa.Enum('visualization', 'data', name='sliceemailreportformat'), nullable=True),
+ sa.Column('created_by_fk', sa.Integer(), nullable=True),
+ sa.Column('changed_by_fk', sa.Integer(), nullable=True),
+ sa.Column('user_id', sa.Integer(), nullable=True),
+ sa.ForeignKeyConstraint(['changed_by_fk'], ['ab_user.id'], ),
+ sa.ForeignKeyConstraint(['created_by_fk'], ['ab_user.id'], ),
+ sa.ForeignKeyConstraint(['slice_id'], ['slices.id'], ),
+ sa.ForeignKeyConstraint(['user_id'], ['ab_user.id'], ),
+ sa.PrimaryKeyConstraint('id')
+ )
+ op.create_index(op.f('ix_slice_email_schedules_active'), 'slice_email_schedules', ['active'], unique=False)
+ # ### end Alembic commands ###
+
+
+def downgrade():
+ # ### commands auto generated by Alembic - please adjust! ###
+ op.drop_index(op.f('ix_slice_email_schedules_active'), table_name='slice_email_schedules')
+ op.drop_table('slice_email_schedules')
+ op.drop_index(op.f('ix_dashboard_email_schedules_active'), table_name='dashboard_email_schedules')
+ op.drop_table('dashboard_email_schedules')
+ # ### end Alembic commands ###
diff --git a/superset/models/__init__.py b/superset/models/__init__.py
index ed8b591..437d194 100644
--- a/superset/models/__init__.py
+++ b/superset/models/__init__.py
@@ -1,3 +1,4 @@
from . import core # noqa
from . import sql_lab # noqa
from . import user_attributes # noqa
+from . import schedules # noqa
diff --git a/superset/models/schedules.py b/superset/models/schedules.py
new file mode 100644
index 0000000..fe9997d
--- /dev/null
+++ b/superset/models/schedules.py
@@ -0,0 +1,93 @@
+# pylint: disable=C,R,W
+"""Models for scheduled execution of jobs"""
+
+import enum
+
+from flask_appbuilder import Model
+from sqlalchemy import (
+ Boolean, Column, Enum, ForeignKey, Integer, String, Text,
+)
+from sqlalchemy.ext.declarative import declared_attr
+from sqlalchemy.orm import relationship
+
+from superset import security_manager
+from superset.models.helpers import AuditMixinNullable, ImportMixin
+
+
+metadata = Model.metadata # pylint: disable=no-member
+
+
+class ScheduleType(enum.Enum):
+ slice = 'slice'
+ dashboard = 'dashboard'
+
+
+class EmailDeliveryType(enum.Enum):
+ attachment = 'Attachment'
+ inline = 'Inline'
+
+
+class SliceEmailReportFormat(enum.Enum):
+ visualization = 'Visualization'
+ data = 'Raw data'
+
+
+class EmailSchedule():
+
+ """Schedules for emailing slices / dashboards"""
+
+ __tablename__ = 'email_schedules'
+
+ id = Column(Integer, primary_key=True)
+ active = Column(Boolean, default=True, index=True)
+ crontab = Column(String(50))
+
+ @declared_attr
+ def user_id(self):
+ return Column(Integer, ForeignKey('ab_user.id'))
+
+ @declared_attr
+ def user(self):
+ return relationship(
+ security_manager.user_model,
+ backref=self.__tablename__,
+ foreign_keys=[self.user_id],
+ )
+
+ recipients = Column(Text)
+ deliver_as_group = Column(Boolean, default=False)
+ delivery_type = Column(Enum(EmailDeliveryType))
+
+
+class DashboardEmailSchedule(Model,
+ AuditMixinNullable,
+ ImportMixin,
+ EmailSchedule):
+ __tablename__ = 'dashboard_email_schedules'
+ dashboard_id = Column(Integer, ForeignKey('dashboards.id'))
+ dashboard = relationship(
+ 'Dashboard',
+ backref='email_schedules',
+ foreign_keys=[dashboard_id],
+ )
+
+
+class SliceEmailSchedule(Model,
+ AuditMixinNullable,
+ ImportMixin,
+ EmailSchedule):
+ __tablename__ = 'slice_email_schedules'
+ slice_id = Column(Integer, ForeignKey('slices.id'))
+ slice = relationship(
+ 'Slice',
+ backref='email_schedules',
+ foreign_keys=[slice_id],
+ )
+ email_format = Column(Enum(SliceEmailReportFormat))
+
+
+def get_scheduler_model(report_type):
+ if report_type == ScheduleType.dashboard.value:
+ return DashboardEmailSchedule
+ elif report_type == ScheduleType.slice.value:
+ return SliceEmailSchedule
diff --git a/superset/sql_lab.py b/superset/sql_lab.py
index 0969147..0c76c7c 100644
--- a/superset/sql_lab.py
+++ b/superset/sql_lab.py
@@ -14,8 +14,8 @@ from sqlalchemy.pool import NullPool
from superset import app, dataframe, db, results_backend, security_manager
from superset.models.sql_lab import Query
from superset.sql_parse import SupersetQuery
+from superset.tasks.celery_app import app as celery_app
from superset.utils.core import (
- get_celery_app,
json_iso_dttm_ser,
now_as_float,
QueryStatus,
@@ -23,8 +23,7 @@ from superset.utils.core import (
)
config = app.config
-celery_app = get_celery_app(config)
-stats_logger = app.config.get('STATS_LOGGER')
+stats_logger = config.get('STATS_LOGGER')
SQLLAB_TIMEOUT = config.get('SQLLAB_ASYNC_TIME_LIMIT_SEC', 600)
log_query = config.get('QUERY_LOGGER')
@@ -77,7 +76,9 @@ def session_scope(nullpool):
session.close()
-@celery_app.task(bind=True, soft_time_limit=SQLLAB_TIMEOUT)
+@celery_app.task(name='sql_lab.get_sql_results',
+ bind=True,
+ soft_time_limit=SQLLAB_TIMEOUT)
def get_sql_results(
ctask, query_id, rendered_query, return_results=True, store_results=False,
user_name=None, start_time=None):
diff --git a/superset/tasks/__init__.py b/superset/tasks/__init__.py
new file mode 100644
index 0000000..d7259b9
--- /dev/null
+++ b/superset/tasks/__init__.py
@@ -0,0 +1,2 @@
+# -*- coding: utf-8 -*-
+from . import schedules # noqa
diff --git a/superset/tasks/celery_app.py b/superset/tasks/celery_app.py
new file mode 100644
index 0000000..028ce7d
--- /dev/null
+++ b/superset/tasks/celery_app.py
@@ -0,0 +1,11 @@
+# pylint: disable=C,R,W
+
+"""Utility functions used across Superset"""
+
+# Superset framework imports
+from superset import app
+from superset.utils.core import get_celery_app
+
+# Globals
+config = app.config
+app = get_celery_app(config)
diff --git a/superset/tasks/schedules.py b/superset/tasks/schedules.py
new file mode 100644
index 0000000..a8f1eb1
--- /dev/null
+++ b/superset/tasks/schedules.py
@@ -0,0 +1,441 @@
+# pylint: disable=C,R,W
+
+"""Utility functions used across Superset"""
+
+from collections import namedtuple
+from datetime import datetime, timedelta
+from email.utils import make_msgid, parseaddr
+import logging
+import time
+
+
+import croniter
+from dateutil.tz import tzlocal
+from flask import render_template, Response, session, url_for
+from flask_babel import gettext as __
+from flask_login import login_user
+import requests
+from retry.api import retry_call
+from selenium.common.exceptions import WebDriverException
+from selenium.webdriver import chrome, firefox
+import simplejson as json
+from six.moves import urllib
+from werkzeug.utils import parse_cookie
+
+# Superset framework imports
+from superset import app, db, security_manager
+from superset.models.schedules import (
+ EmailDeliveryType,
+ get_scheduler_model,
+ ScheduleType,
+ SliceEmailReportFormat,
+)
+from superset.tasks.celery_app import app as celery_app
+from superset.utils.core import (
+ get_email_address_list,
+ send_email_smtp,
+)
+
+# Globals
+config = app.config
+logging.getLogger('tasks.email_reports').setLevel(logging.INFO)
+
+# Time in seconds, we will wait for the page to load and render
+PAGE_RENDER_WAIT = 30
+
+
+EmailContent = namedtuple('EmailContent', ['body', 'data', 'images'])
+
+
+def _get_recipients(schedule):
+ bcc = config.get('EMAIL_REPORT_BCC_ADDRESS', None)
+
+ if schedule.deliver_as_group:
+ to = schedule.recipients
+ yield (to, bcc)
+ else:
+ for to in get_email_address_list(schedule.recipients):
+ yield (to, bcc)
+
+
+def _deliver_email(schedule, subject, email):
+ for (to, bcc) in _get_recipients(schedule):
+ send_email_smtp(
+ to, subject, email.body, config,
+ data=email.data,
+ images=email.images,
+ bcc=bcc,
+ mime_subtype='related',
+ dryrun=config.get('SCHEDULED_EMAIL_DEBUG_MODE'),
+ )
+
+
+def _generate_mail_content(schedule, screenshot, name, url):
+ if schedule.delivery_type == EmailDeliveryType.attachment:
+ images = None
+ data = {
+ 'screenshot.png': screenshot,
+ }
+ body = __(
+ '<b><a href="%(url)s">Explore in Superset</a></b><p></p>',
+ name=name,
+ url=url,
+ )
+ elif schedule.delivery_type == EmailDeliveryType.inline:
+ # Get the domain from the 'From' address ..
+ # and make a message id without the < > in the ends
+ domain = parseaddr(config.get('SMTP_MAIL_FROM'))[1].split('@')[1]
+ msgid = make_msgid(domain)[1:-1]
+
+ images = {
+ msgid: screenshot,
+ }
+ data = None
+ body = __(
+ """
+ <b><a href="%(url)s">Explore in Superset</a></b><p></p>
+ <img src="cid:%(msgid)s">
+ """,
+ name=name, url=url, msgid=msgid,
+ )
+
+ return EmailContent(body, data, images)
+
+
+def _get_auth_cookies():
+ # Login with the user specified to get the reports
+ with app.test_request_context():
+ user = security_manager.find_user(config.get('EMAIL_REPORTS_USER'))
+ login_user(user)
+
+ # A mock response object to get the cookie information from
+ response = Response()
+ app.session_interface.save_session(app, session, response)
+
+ cookies = []
+
+ # Set the cookies in the driver
+ for name, value in response.headers:
+ if name.lower() == 'set-cookie':
+ cookie = parse_cookie(value)
+ cookies.append(cookie['session'])
+
+ return cookies
+
+
+def _get_url_path(view, **kwargs):
+ with app.test_request_context():
+ return urllib.parse.urljoin(
+ str(config.get('WEBDRIVER_BASEURL')),
+ url_for(view, **kwargs),
+ )
+
+
+def create_webdriver():
+ # Create a webdriver for use in fetching reports
+ if config.get('EMAIL_REPORTS_WEBDRIVER') == 'firefox':
+ driver_class = firefox.webdriver.WebDriver
+ options = firefox.options.Options()
+ elif config.get('EMAIL_REPORTS_WEBDRIVER') == 'chrome':
+ driver_class = chrome.webdriver.WebDriver
+ options = chrome.options.Options()
+
+ options.add_argument('--headless')
+
+ # Prepare args for the webdriver init
+ kwargs = dict(
+ options=options,
+ )
+ kwargs.update(config.get('WEBDRIVER_CONFIGURATION'))
+
+ # Initialize the driver
+ driver = driver_class(**kwargs)
+
+ # Some webdrivers need an initial hit to the welcome URL
+ # before we set the cookie
+ welcome_url = _get_url_path('Superset.welcome')
+
+ # Hit the welcome URL and check if we were asked to login
+ driver.get(welcome_url)
+ elements = driver.find_elements_by_id('loginbox')
+
+ # This indicates that we were not prompted for a login box.
+ if not elements:
+ return driver
+
+ # Set the cookies in the driver
+ for cookie in _get_auth_cookies():
+ info = dict(name='session', value=cookie)
+ driver.add_cookie(info)
+
+ return driver
+
+
+def destroy_webdriver(driver):
+ """
+ Destroy a driver
+ """
+
+ # This is some very flaky code in selenium. Hence the retries
+ # and catch-all exceptions
+ try:
+ retry_call(driver.close, tries=2)
+ except Exception:
+ pass
+ try:
+ driver.quit()
+ except Exception:
+ pass
+
+
+def deliver_dashboard(schedule):
+ """
+ Given a schedule, delivery the dashboard as an email report
+ """
+ dashboard = schedule.dashboard
+
+ dashboard_url = _get_url_path(
+ 'Superset.dashboard',
+ dashboard_id=dashboard.id,
+ )
+
+ # Create a driver, fetch the page, wait for the page to render
+ driver = create_webdriver()
+ window = config.get('WEBDRIVER_WINDOW')['dashboard']
+ driver.set_window_size(*window)
+ driver.get(dashboard_url)
+ time.sleep(PAGE_RENDER_WAIT)
+
+ # Set up a function to retry once for the element.
+ # This is buggy in certain selenium versions with firefox driver
+ get_element = getattr(driver, 'find_element_by_class_name')
+ element = retry_call(
+ get_element,
+ fargs=['grid-container'],
+ tries=2,
+ delay=PAGE_RENDER_WAIT,
+ )
+
+ try:
+ screenshot = element.screenshot_as_png
+ except WebDriverException:
+ # Some webdrivers do not support screenshots for elements.
+ # In such cases, take a screenshot of the entire page.
+ screenshot = driver.screenshot() # pylint: disable=no-member
+ finally:
+ destroy_webdriver(driver)
+
+ # Generate the email body and attachments
+ email = _generate_mail_content(
+ schedule,
+ screenshot,
+ dashboard.dashboard_title,
+ dashboard_url,
+ )
+
+ subject = __(
+ '%(prefix)s %(title)s',
+ prefix=config.get('EMAIL_REPORTS_SUBJECT_PREFIX'),
+ title=dashboard.dashboard_title,
+ )
+
+ _deliver_email(schedule, subject, email)
+
+
+def _get_slice_data(schedule):
+ slc = schedule.slice
+
+ slice_url = _get_url_path(
+ 'Superset.explore_json',
+ csv='true',
+ form_data=json.dumps({'slice_id': slc.id}),
+ )
+
+ # URL to include in the email
+ url = _get_url_path(
+ 'Superset.slice',
+ slice_id=slc.id,
+ )
+
+ cookies = {}
+ for cookie in _get_auth_cookies():
+ cookies['session'] = cookie
+
+ response = requests.get(slice_url, cookies=cookies)
+ response.raise_for_status()
+
+ # TODO: Move to the csv module
+ rows = [r.split(b',') for r in response.content.splitlines()]
+
+ if schedule.delivery_type == EmailDeliveryType.inline:
+ data = None
+
+ # Parse the csv file and generate HTML
+ columns = rows.pop(0)
+ with app.app_context():
+ body = render_template(
+ 'superset/reports/slice_data.html',
+ columns=columns,
+ rows=rows,
+ name=slc.slice_name,
+ link=url,
+ )
+
+ elif schedule.delivery_type == EmailDeliveryType.attachment:
+ data = {
+ __('%(name)s.csv', name=slc.slice_name): response.content,
+ }
+ body = __(
+ '<b><a href="%(url)s">Explore in Superset</a></b><p></p>',
+ name=slc.slice_name,
+ url=url,
+ )
+
+ return EmailContent(body, data, None)
+
+
+def _get_slice_visualization(schedule):
+ slc = schedule.slice
+
+ # Create a driver, fetch the page, wait for the page to render
+ driver = create_webdriver()
+ window = config.get('WEBDRIVER_WINDOW')['slice']
+ driver.set_window_size(*window)
+
+ slice_url = _get_url_path(
+ 'Superset.slice',
+ slice_id=slc.id,
+ )
+
+ driver.get(slice_url)
+ time.sleep(PAGE_RENDER_WAIT)
+
+ # Set up a function to retry once for the element.
+ # This is buggy in certain selenium versions with firefox driver
+ element = retry_call(
+ driver.find_element_by_class_name,
+ fargs=['chart-container'],
+ tries=2,
+ delay=PAGE_RENDER_WAIT,
+ )
+
+ try:
+ screenshot = element.screenshot_as_png
+ except WebDriverException:
+ # Some webdrivers do not support screenshots for elements.
+ # In such cases, take a screenshot of the entire page.
+ screenshot = driver.screenshot() # pylint: disable=no-member
+ finally:
+ destroy_webdriver(driver)
+
+ # Generate the email body and attachments
+ return _generate_mail_content(
+ schedule,
+ screenshot,
+ slc.slice_name,
+ slice_url,
+ )
+
+
+def deliver_slice(schedule):
+ """
+ Given a schedule, delivery the slice as an email report
+ """
+ if schedule.email_format == SliceEmailReportFormat.data:
+ email = _get_slice_data(schedule)
+ elif schedule.email_format == SliceEmailReportFormat.visualization:
+ email = _get_slice_visualization(schedule)
+ else:
+ raise RuntimeError('Unknown email report format')
+
+ subject = __(
+ '%(prefix)s %(title)s',
+ prefix=config.get('EMAIL_REPORTS_SUBJECT_PREFIX'),
+ title=schedule.slice.slice_name,
+ )
+
+ _deliver_email(schedule, subject, email)
+
+
+@celery_app.task(name='email_reports.send', bind=True, soft_time_limit=300)
+def schedule_email_report(task, report_type, schedule_id, recipients=None):
+ model_cls = get_scheduler_model(report_type)
+ schedule = db.session.query(model_cls).get(schedule_id)
+
+ # The user may have disabled the schedule. If so, ignore this
+ if not schedule or not schedule.active:
+ logging.info('Ignoring deactivated schedule')
+ return
+
+ # TODO: Detach the schedule object from the db session
+ if recipients is not None:
+ schedule.id = schedule_id
+ schedule.recipients = recipients
+
+ if report_type == ScheduleType.dashboard.value:
+ deliver_dashboard(schedule)
+ elif report_type == ScheduleType.slice.value:
+ deliver_slice(schedule)
+ else:
+ raise RuntimeError('Unknown report type')
+
+
+def next_schedules(crontab, start_at, stop_at, resolution=0):
+ crons = croniter.croniter(crontab, start_at - timedelta(seconds=1))
+ previous = start_at - timedelta(days=1)
+
+ for eta in crons.all_next(datetime):
+ # Do not cross the time boundary
+ if eta >= stop_at:
+ break
+
+ if eta < start_at:
+ continue
+
+ # Do not allow very frequent tasks
+ if eta - previous < timedelta(seconds=resolution):
+ continue
+
+ yield eta
+ previous = eta
+
+
+def schedule_window(report_type, start_at, stop_at, resolution):
+ """
+ Find all active schedules and schedule celery tasks for
+ each of them with a specific ETA (determined by parsing
+ the cron schedule for the schedule)
+ """
+ model_cls = get_scheduler_model(report_type)
+ dbsession = db.create_scoped_session()
+ schedules = dbsession.query(model_cls).filter(model_cls.active.is_(True))
+
+ for schedule in schedules:
+ args = (
+ report_type,
+ schedule.id,
+ )
+
+ # Schedule the job for the specified time window
+ for eta in next_schedules(schedule.crontab,
+ start_at,
+ stop_at,
+ resolution=resolution):
+ schedule_email_report.apply_async(args, eta=eta)
+
+
+@celery_app.task(name='email_reports.schedule_hourly')
+def schedule_hourly():
+ """ Celery beat job meant to be invoked hourly """
+
+ if not config.get('ENABLE_SCHEDULED_EMAIL_REPORTS'):
+ logging.info('Scheduled email reports not enabled in config')
+ return
+
+ resolution = config.get('EMAIL_REPORTS_CRON_RESOLUTION', 0) * 60
+
+ # Get the top of the hour
+ start_at = datetime.now(tzlocal()).replace(microsecond=0, second=0, minute=0)
+ stop_at = start_at + timedelta(seconds=3600)
+ schedule_window(ScheduleType.dashboard.value, start_at, stop_at, resolution)
+ schedule_window(ScheduleType.slice.value, start_at, stop_at, resolution)
diff --git a/superset/templates/superset/reports/slice_data.html b/superset/templates/superset/reports/slice_data.html
new file mode 100644
index 0000000..c5bb3d1
--- /dev/null
+++ b/superset/templates/superset/reports/slice_data.html
@@ -0,0 +1,31 @@
+<html>
+ <head>
+ </head>
+ <body>
+ <p></p>
+ <b><a href="{{ link }}">Explore this data in Superset</a></b>
+ <p></p>
+ <table border='1' cellspacing='0' cellpadding='3'
+ style='border: 1px solid #c0c0c0; border-collapse: collapse;'>
+ <thead>
+ <tr>
+ <th colspan={{ columns | length }} bgcolor='#c0c0c0'>{{ name }}</th>
+ </tr>
+ <tr>
+ {%- for column in columns %}
+ <th bgcolor='#f0f0f0'>{{ column.decode('utf-8') | replace('_', ' ') | title }}</th>
+ {%- endfor %}
+ </tr>
+ </thead>
+ <tbody>
+ {%- for row in rows %}
+ <tr>
+ {%- for column in row %}
+ <td>{{ column.decode('utf-8') }}</td>
+ {%- endfor %}
+ </tr>
+ {%- endfor %}
+ </tbody>
+ </table>
+ </body>
+</html>
diff --git a/superset/utils/core.py b/superset/utils/core.py
index bd23d07..2a002b8 100644
--- a/superset/utils/core.py
+++ b/superset/utils/core.py
@@ -4,6 +4,7 @@ from builtins import object
from datetime import date, datetime, time, timedelta
import decimal
from email.mime.application import MIMEApplication
+from email.mime.image import MIMEImage
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formatdate
@@ -582,21 +583,23 @@ def notify_user_about_perm_udate(
dryrun=not config.get('EMAIL_NOTIFICATIONS'))
-def send_email_smtp(to, subject, html_content, config, files=None,
- dryrun=False, cc=None, bcc=None, mime_subtype='mixed'):
+def send_email_smtp(to, subject, html_content, config,
+ files=None, data=None, images=None, dryrun=False,
+ cc=None, bcc=None, mime_subtype='mixed'):
"""
Send an email with html content, eg:
send_email_smtp(
'test@example.com', 'foo', '<b>Foo</b> bar',['/dev/null'], dryrun=True)
"""
smtp_mail_from = config.get('SMTP_MAIL_FROM')
-
to = get_email_address_list(to)
msg = MIMEMultipart(mime_subtype)
msg['Subject'] = subject
msg['From'] = smtp_mail_from
msg['To'] = ', '.join(to)
+ msg.preamble = 'This is a multi-part message in MIME format.'
+
recipients = to
if cc:
cc = get_email_address_list(cc)
@@ -612,6 +615,7 @@ def send_email_smtp(to, subject, html_content, config, files=None,
mime_text = MIMEText(html_content, 'html')
msg.attach(mime_text)
+ # Attach files by reading them from disk
for fname in files or []:
basename = os.path.basename(fname)
with open(fname, 'rb') as f:
@@ -621,6 +625,23 @@ def send_email_smtp(to, subject, html_content, config, files=None,
Content_Disposition="attachment; filename='%s'" % basename,
Name=basename))
+ # Attach any files passed directly
+ for name, body in (data or {}).items():
+ msg.attach(
+ MIMEApplication(
+ body,
+ Content_Disposition="attachment; filename='%s'" % name,
+ Name=name,
+ ))
+
+ # Attach any inline images, which may be required for display in
+ # HTML content (inline)
+ for msgid, body in (images or {}).items():
+ image = MIMEImage(body)
+ image.add_header('Content-ID', '<%s>' % msgid)
+ image.add_header('Content-Disposition', 'inline')
+ msg.attach(image)
+
send_MIME_email(smtp_mail_from, recipients, msg, config, dryrun=dryrun)
@@ -639,7 +660,7 @@ def send_MIME_email(e_from, e_to, mime_msg, config, dryrun=False):
s.starttls()
if SMTP_USER and SMTP_PASSWORD:
s.login(SMTP_USER, SMTP_PASSWORD)
- logging.info('Sent an alert email to ' + str(e_to))
+ logging.info('Sent an email to ' + str(e_to))
s.sendmail(e_from, e_to, mime_msg.as_string())
s.quit()
else:
@@ -651,11 +672,13 @@ def get_email_address_list(address_string):
if isinstance(address_string, basestring):
if ',' in address_string:
address_string = address_string.split(',')
+ elif '\n' in address_string:
+ address_string = address_string.split('\n')
elif ';' in address_string:
address_string = address_string.split(';')
else:
address_string = [address_string]
- return address_string
+ return [x.strip() for x in address_string if x.strip()]
def choicify(values):
diff --git a/superset/views/__init__.py b/superset/views/__init__.py
index eed1ff2..6bd52d9 100644
--- a/superset/views/__init__.py
+++ b/superset/views/__init__.py
@@ -4,3 +4,4 @@ from . import core # noqa
from . import sql_lab # noqa
from . import annotations # noqa
from . import datasource # noqa
+from . import schedules # noqa
diff --git a/superset/views/core.py b/superset/views/core.py
index c565e3a..3c0ffa0 100755
--- a/superset/views/core.py
+++ b/superset/views/core.py
@@ -1635,7 +1635,6 @@ class Superset(BaseSupersetView):
@staticmethod
def _set_dash_metadata(dashboard, data):
positions = data['positions']
-
# find slices in the position data
slice_ids = []
slice_id_to_name = {}
diff --git a/superset/views/schedules.py b/superset/views/schedules.py
new file mode 100644
index 0000000..3e57621
--- /dev/null
+++ b/superset/views/schedules.py
@@ -0,0 +1,279 @@
+# pylint: disable=C,R,W
+
+import enum
+
+from croniter import croniter
+from flask import flash, g
+from flask_appbuilder import expose
+from flask_appbuilder.models.sqla.interface import SQLAInterface
+from flask_appbuilder.security.decorators import has_access
+from flask_babel import gettext as __
+from flask_babel import lazy_gettext as _
+import simplejson as json
+from wtforms import BooleanField, StringField
+
+from superset import app, appbuilder, db, security_manager
+from superset.exceptions import SupersetException
+from superset.models.core import Dashboard, Slice
+from superset.models.schedules import (
+ DashboardEmailSchedule,
+ ScheduleType,
+ SliceEmailSchedule,
+)
+from superset.tasks.schedules import schedule_email_report
+from superset.utils.core import (
+ get_email_address_list,
+ json_iso_dttm_ser,
+)
+from superset.views.core import json_success
+from .base import DeleteMixin, SupersetModelView
+
+
+class EmailScheduleView(SupersetModelView, DeleteMixin):
+ _extra_data = {
+ 'test_email': False,
+ 'test_email_recipients': None,
+ }
+ schedule_type = None
+ schedule_type_model = None
+
+ page_size = 20
+
+ add_exclude_columns = [
+ 'user',
+ 'created_on',
+ 'changed_on',
+ 'created_by',
+ 'changed_by',
+ ]
+
+ edit_exclude_columns = add_exclude_columns
+
+ description_columns = {
+ 'deliver_as_group': 'If enabled, send a single email to all '
+ 'recipients (in email/To: field)',
+ 'crontab': 'Unix style crontab schedule to deliver emails. '
+ 'Changes to schedules reflect in one hour.',
+ 'delivery_type': 'Indicates how the rendered content is delivered',
+ }
+
+ add_form_extra_fields = {
+ 'test_email': BooleanField(
+ 'Send Test Email',
+ default=False,
+ description='If enabled, we send a test mail on create / update',
+ ),
+ 'test_email_recipients': StringField(
+ 'Test Email Recipients',
+ default=None,
+ description='List of recipients to send test email to. '
+ 'If empty, we send it to the original recipients',
+ ),
+ }
+
+ edit_form_extra_fields = add_form_extra_fields
+
+ def process_form(self, form, is_created):
+ recipients = form.test_email_recipients.data.strip() or None
+ self._extra_data['test_email'] = form.test_email.data
+ self._extra_data['test_email_recipients'] = recipients
+
+ def pre_add(self, obj):
+ try:
+ recipients = get_email_address_list(obj.recipients)
+ obj.recipients = ', '.join(recipients)
+ except Exception:
+ raise SupersetException('Invalid email list')
+
+ obj.user = obj.user or g.user
+ if not croniter.is_valid(obj.crontab):
+ raise SupersetException('Invalid crontab format')
+
+ def pre_update(self, obj):
+ self.pre_add(obj)
+
+ def post_add(self, obj):
+ # Schedule a test mail if the user requested for it.
+ if self._extra_data['test_email']:
+ recipients = self._extra_data['test_email_recipients']
+ args = (self.schedule_type, obj.id)
+ kwargs = dict(recipients=recipients)
+ schedule_email_report.apply_async(args=args, kwargs=kwargs)
+
+ # Notify the user that schedule changes will be activate only in the
+ # next hour
+ if obj.active:
+ flash('Schedule changes will get applied in one hour', 'warning')
+
+ def post_update(self, obj):
+ self.post_add(obj)
+
+ @has_access
+ @expose('/fetch/<int:item_id>/', methods=['GET'])
+ def fetch_schedules(self, item_id):
+
+ query = db.session.query(self.datamodel.obj)
+ query = query.join(self.schedule_type_model).filter(
+ self.schedule_type_model.id == item_id)
+
+ schedules = []
+ for schedule in query.all():
+ info = {'schedule': schedule.id}
+
+ for col in self.list_columns + self.add_exclude_columns:
+ info[col] = getattr(schedule, col)
+
+ if isinstance(info[col], enum.Enum):
+ info[col] = info[col].name
+ elif isinstance(info[col], security_manager.user_model):
+ info[col] = info[col].username
+
+ info['user'] = schedule.user.username
+ info[self.schedule_type] = getattr(schedule, self.schedule_type).id
+ schedules.append(info)
+
+ return json_success(json.dumps(schedules, default=json_iso_dttm_ser))
+
+
+class DashboardEmailScheduleView(EmailScheduleView):
+ schedule_type = ScheduleType.dashboard.name
+ schedule_type_model = Dashboard
+
+ add_title = _('Schedule Email Reports for Dashboards')
+ edit_title = add_title
+ list_title = _('Manage Email Reports for Dashboards')
+
+ datamodel = SQLAInterface(DashboardEmailSchedule)
+ order_columns = ['user', 'dashboard', 'created_on']
+
+ list_columns = [
+ 'dashboard',
+ 'active',
+ 'crontab',
+ 'user',
+ 'deliver_as_group',
+ 'delivery_type',
+ ]
+
+ add_columns = [
+ 'dashboard',
+ 'active',
+ 'crontab',
+ 'recipients',
+ 'deliver_as_group',
+ 'delivery_type',
+ 'test_email',
+ 'test_email_recipients',
+ ]
+
+ edit_columns = add_columns
+
+ search_columns = [
+ 'dashboard',
+ 'active',
+ 'user',
+ 'deliver_as_group',
+ 'delivery_type',
+ ]
+
+ label_columns = {
+ 'dashboard': _('Dashboard'),
+ 'created_on': _('Created On'),
+ 'changed_on': _('Changed On'),
+ 'user': _('User'),
+ 'active': _('Active'),
+ 'crontab': _('Crontab'),
+ 'recipients': _('Recipients'),
+ 'deliver_as_group': _('Deliver As Group'),
+ 'delivery_type': _('Delivery Type'),
+ }
+
+ def pre_add(self, obj):
+ if obj.dashboard is None:
+ raise SupersetException('Dashboard is mandatory')
+ super(DashboardEmailScheduleView, self).pre_add(obj)
+
+
+class SliceEmailScheduleView(EmailScheduleView):
+ schedule_type = ScheduleType.slice.name
+ schedule_type_model = Slice
+ add_title = _('Schedule Email Reports for Charts')
+ edit_title = add_title
+ list_title = _('Manage Email Reports for Charts')
+
+ datamodel = SQLAInterface(SliceEmailSchedule)
+ order_columns = ['user', 'slice', 'created_on']
+ list_columns = [
+ 'slice',
+ 'active',
+ 'crontab',
+ 'user',
+ 'deliver_as_group',
+ 'delivery_type',
+ 'email_format',
+ ]
+
+ add_columns = [
+ 'slice',
+ 'active',
+ 'crontab',
+ 'recipients',
+ 'deliver_as_group',
+ 'delivery_type',
+ 'email_format',
+ 'test_email',
+ 'test_email_recipients',
+ ]
+
+ edit_columns = add_columns
+
+ search_columns = [
+ 'slice',
+ 'active',
+ 'user',
+ 'deliver_as_group',
+ 'delivery_type',
+ 'email_format',
+ ]
+
+ label_columns = {
+ 'slice': _('Chart'),
+ 'created_on': _('Created On'),
+ 'changed_on': _('Changed On'),
+ 'user': _('User'),
+ 'active': _('Active'),
+ 'crontab': _('Crontab'),
+ 'recipients': _('Recipients'),
+ 'deliver_as_group': _('Deliver As Group'),
+ 'delivery_type': _('Delivery Type'),
+ 'email_format': _('Email Format'),
+ }
+
+ def pre_add(self, obj):
+ if obj.slice is None:
+ raise SupersetException('Slice is mandatory')
+ super(SliceEmailScheduleView, self).pre_add(obj)
+
+
+def _register_schedule_menus():
+ appbuilder.add_separator('Manage')
+
+ appbuilder.add_view(
+ DashboardEmailScheduleView,
+ 'Dashboard Email Schedules',
+ label=__('Dashboard Emails'),
+ category='Manage',
+ category_label=__('Manage'),
+ icon='fa-search')
+
+ appbuilder.add_view(
+ SliceEmailScheduleView,
+ 'Chart Emails',
+ label=__('Chart Email Schedules'),
+ category='Manage',
+ category_label=__('Manage'),
+ icon='fa-search')
+
+
+if app.config.get('ENABLE_SCHEDULED_EMAIL_REPORTS'):
+ _register_schedule_menus()
diff --git a/tests/email_tests.py b/tests/email_tests.py
index 559372b..229eb8b 100644
--- a/tests/email_tests.py
+++ b/tests/email_tests.py
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
"""Unit tests for email service in Superset"""
from email.mime.application import MIMEApplication
+from email.mime.image import MIMEImage
from email.mime.multipart import MIMEMultipart
import logging
import tempfile
@@ -10,6 +11,7 @@ import mock
from superset import app
from superset.utils import core as utils
+from .utils import read_fixture
send_email_test = mock.Mock()
@@ -38,6 +40,39 @@ class EmailSmtpTest(unittest.TestCase):
assert msg.get_payload()[-1].get_payload() == mimeapp.get_payload()
@mock.patch('superset.utils.core.send_MIME_email')
+ def test_send_smtp_data(self, mock_send_mime):
+ utils.send_email_smtp(
+ 'to', 'subject', 'content', app.config, data={'1.txt': b'data'})
+ assert mock_send_mime.called
+ call_args = mock_send_mime.call_args[0]
+ logging.debug(call_args)
+ assert call_args[0] == app.config.get('SMTP_MAIL_FROM')
+ assert call_args[1] == ['to']
+ msg = call_args[2]
+ assert msg['Subject'] == 'subject'
+ assert msg['From'] == app.config.get('SMTP_MAIL_FROM')
+ assert len(msg.get_payload()) == 2
+ mimeapp = MIMEApplication('data')
+ assert msg.get_payload()[-1].get_payload() == mimeapp.get_payload()
+
+ @mock.patch('superset.utils.core.send_MIME_email')
+ def test_send_smtp_inline_images(self, mock_send_mime):
+ image = read_fixture('sample.png')
+ utils.send_email_smtp(
+ 'to', 'subject', 'content', app.config, images=dict(blah=image))
+ assert mock_send_mime.called
+ call_args = mock_send_mime.call_args[0]
+ logging.debug(call_args)
+ assert call_args[0] == app.config.get('SMTP_MAIL_FROM')
+ assert call_args[1] == ['to']
+ msg = call_args[2]
+ assert msg['Subject'] == 'subject'
+ assert msg['From'] == app.config.get('SMTP_MAIL_FROM')
+ assert len(msg.get_payload()) == 2
+ mimeapp = MIMEImage(image)
+ assert msg.get_payload()[-1].get_payload() == mimeapp.get_payload()
+
+ @mock.patch('superset.utils.core.send_MIME_email')
def test_send_bcc_smtp(self, mock_send_mime):
attachment = tempfile.NamedTemporaryFile()
attachment.write(b'attachment')
diff --git a/tests/fixtures/sample.png b/tests/fixtures/sample.png
new file mode 100644
index 0000000..a352241
Binary files /dev/null and b/tests/fixtures/sample.png differ
diff --git a/tests/fixtures/trends.csv b/tests/fixtures/trends.csv
new file mode 100644
index 0000000..1e347d9
--- /dev/null
+++ b/tests/fixtures/trends.csv
@@ -0,0 +1,3 @@
+t1,t2,t3__sum
+c11,c12,c13
+c21,c22,c23
diff --git a/tests/schedules_test.py b/tests/schedules_test.py
new file mode 100644
index 0000000..9ade8eb
--- /dev/null
+++ b/tests/schedules_test.py
@@ -0,0 +1,368 @@
+from datetime import datetime, timedelta
+import unittest
+
+from flask_babel import gettext as __
+from mock import Mock, patch, PropertyMock
+from selenium.common.exceptions import WebDriverException
+
+from superset import app, db
+from superset.models.core import Dashboard, Slice
+from superset.models.schedules import (
+ DashboardEmailSchedule,
+ EmailDeliveryType,
+ SliceEmailReportFormat,
+ SliceEmailSchedule,
+)
+from superset.tasks.schedules import (
+ create_webdriver,
+ deliver_dashboard,
+ deliver_slice,
+ next_schedules,
+)
+from .utils import read_fixture
+
+
+class SchedulesTestCase(unittest.TestCase):
+
+ RECIPIENTS = 'recipient1@superset.com, recipient2@superset.com'
+ BCC = 'bcc@superset.com'
+ CSV = read_fixture('trends.csv')
+
+ @classmethod
+ def setUpClass(cls):
+ cls.common_data = dict(
+ active=True,
+ crontab='* * * * *',
+ recipients=cls.RECIPIENTS,
+ deliver_as_group=True,
+ delivery_type=EmailDeliveryType.inline,
+ )
+
+ # Pick up a random slice and dashboard
+ slce = db.session.query(Slice).all()[0]
+ dashboard = db.session.query(Dashboard).all()[0]
+
+ dashboard_schedule = DashboardEmailSchedule(**cls.common_data)
+ dashboard_schedule.dashboard_id = dashboard.id
+ dashboard_schedule.user_id = 1
+ db.session.add(dashboard_schedule)
+
+ slice_schedule = SliceEmailSchedule(**cls.common_data)
+ slice_schedule.slice_id = slce.id
+ slice_schedule.user_id = 1
+ slice_schedule.email_format = SliceEmailReportFormat.data
+
+ db.session.add(slice_schedule)
+ db.session.commit()
+
+ cls.slice_schedule = slice_schedule.id
+ cls.dashboard_schedule = dashboard_schedule.id
+
+ @classmethod
+ def tearDownClass(cls):
+ db.session.query(SliceEmailSchedule).filter_by(id=cls.slice_schedule).delete()
+ db.session.query(DashboardEmailSchedule).filter_by(
+ id=cls.dashboard_schedule).delete()
+ db.session.commit()
+
+ def test_crontab_scheduler(self):
+ crontab = '* * * * *'
+
+ start_at = datetime.now().replace(microsecond=0, second=0, minute=0)
+ stop_at = start_at + timedelta(seconds=3600)
+
+ # Fire off the task every minute
+ schedules = list(next_schedules(crontab, start_at, stop_at, resolution=0))
+
+ self.assertEqual(schedules[0], start_at)
+ self.assertEqual(schedules[-1], stop_at - timedelta(seconds=60))
+ self.assertEqual(len(schedules), 60)
+
+ # Fire off the task every 10 minutes, controlled via resolution
+ schedules = list(next_schedules(crontab, start_at, stop_at, resolution=10 * 60))
+
+ self.assertEqual(schedules[0], start_at)
+ self.assertEqual(schedules[-1], stop_at - timedelta(seconds=10 * 60))
+ self.assertEqual(len(schedules), 6)
+
+ # Fire off the task every 12 minutes, controlled via resolution
+ schedules = list(next_schedules(crontab, start_at, stop_at, resolution=12 * 60))
+
+ self.assertEqual(schedules[0], start_at)
+ self.assertEqual(schedules[-1], stop_at - timedelta(seconds=12 * 60))
+ self.assertEqual(len(schedules), 5)
+
+ def test_wider_schedules(self):
+ crontab = '*/15 2,10 * * *'
+
+ for hour in range(0, 24):
+ start_at = datetime.now().replace(
+ microsecond=0, second=0, minute=0, hour=hour)
+ stop_at = start_at + timedelta(seconds=3600)
+ schedules = list(next_schedules(crontab, start_at, stop_at, resolution=0))
+
+ if hour in (2, 10):
+ self.assertEqual(len(schedules), 4)
+ else:
+ self.assertEqual(len(schedules), 0)
+
+ def test_complex_schedule(self):
+ # Run the job on every Friday of March and May
+ # On these days, run the job at
+ # 5:10 pm
+ # 5:11 pm
+ # 5:12 pm
+ # 5:13 pm
+ # 5:14 pm
+ # 5:15 pm
+ # 5:25 pm
+ # 5:28 pm
+ # 5:31 pm
+ # 5:34 pm
+ # 5:37 pm
+ # 5:40 pm
+ crontab = '10-15,25-40/3 17 * 3,5 5'
+ start_at = datetime.strptime('2018/01/01', '%Y/%m/%d')
+ stop_at = datetime.strptime('2018/12/31', '%Y/%m/%d')
+
+ schedules = list(next_schedules(crontab, start_at, stop_at, resolution=60))
+ self.assertEqual(len(schedules), 108)
+ fmt = '%Y-%m-%d %H:%M:%S'
+ self.assertEqual(schedules[0], datetime.strptime('2018-03-02 17:10:00', fmt))
+ self.assertEqual(schedules[-1], datetime.strptime('2018-05-25 17:40:00', fmt))
+ self.assertEqual(schedules[59], datetime.strptime('2018-03-30 17:40:00', fmt))
+ self.assertEqual(schedules[60], datetime.strptime('2018-05-04 17:10:00', fmt))
+
+ @patch('superset.tasks.schedules.firefox.webdriver.WebDriver')
+ def test_create_driver(self, mock_driver_class):
+ mock_driver = Mock()
+ mock_driver_class.return_value = mock_driver
+ mock_driver.find_elements_by_id.side_effect = [True, False]
+
+ create_webdriver()
+ create_webdriver()
+ mock_driver.add_cookie.assert_called_once()
+
+ @patch('superset.tasks.schedules.firefox.webdriver.WebDriver')
+ @patch('superset.tasks.schedules.send_email_smtp')
+ @patch('superset.tasks.schedules.time')
+ def test_deliver_dashboard_inline(self, mtime, send_email_smtp, driver_class):
+ element = Mock()
+ driver = Mock()
+ mtime.sleep.return_value = None
+
+ driver_class.return_value = driver
+
+ # Ensure that we are able to login with the driver
+ driver.find_elements_by_id.side_effect = [True, False]
+ driver.find_element_by_class_name.return_value = element
+ element.screenshot_as_png = read_fixture('sample.png')
+
+ schedule = db.session.query(DashboardEmailSchedule).filter_by(
+ id=self.dashboard_schedule).all()[0]
+
+ deliver_dashboard(schedule)
+ mtime.sleep.assert_called_once()
+ driver.screenshot.assert_not_called()
+ send_email_smtp.assert_called_once()
+
+ @patch('superset.tasks.schedules.firefox.webdriver.WebDriver')
+ @patch('superset.tasks.schedules.send_email_smtp')
+ @patch('superset.tasks.schedules.time')
+ def test_deliver_dashboard_as_attachment(self, mtime, send_email_smtp, driver_class):
+ element = Mock()
+ driver = Mock()
+ mtime.sleep.return_value = None
+
+ driver_class.return_value = driver
+
+ # Ensure that we are able to login with the driver
+ driver.find_elements_by_id.side_effect = [True, False]
+ driver.find_element_by_id.return_value = element
+ driver.find_element_by_class_name.return_value = element
+ element.screenshot_as_png = read_fixture('sample.png')
+
+ schedule = db.session.query(DashboardEmailSchedule).filter_by(
+ id=self.dashboard_schedule).all()[0]
+
+ schedule.delivery_type = EmailDeliveryType.attachment
+ deliver_dashboard(schedule)
+
+ mtime.sleep.assert_called_once()
+ driver.screenshot.assert_not_called()
+ send_email_smtp.assert_called_once()
+ self.assertIsNone(send_email_smtp.call_args[1]['images'])
+ self.assertEquals(
+ send_email_smtp.call_args[1]['data']['screenshot.png'],
+ element.screenshot_as_png,
+ )
+
+ @patch('superset.tasks.schedules.firefox.webdriver.WebDriver')
+ @patch('superset.tasks.schedules.send_email_smtp')
+ @patch('superset.tasks.schedules.time')
+ def test_dashboard_chrome_like(self, mtime, send_email_smtp, driver_class):
+ # Test functionality for chrome driver which does not support
+ # element snapshots
+ element = Mock()
+ driver = Mock()
+ mtime.sleep.return_value = None
+ type(element).screenshot_as_png = PropertyMock(side_effect=WebDriverException)
+
+ driver_class.return_value = driver
+
+ # Ensure that we are able to login with the driver
+ driver.find_elements_by_id.side_effect = [True, False]
+ driver.find_element_by_id.return_value = element
+ driver.find_element_by_class_name.return_value = element
+ driver.screenshot.return_value = read_fixture('sample.png')
+
+ schedule = db.session.query(DashboardEmailSchedule).filter_by(
+ id=self.dashboard_schedule).all()[0]
+
+ deliver_dashboard(schedule)
+ mtime.sleep.assert_called_once()
+ driver.screenshot.assert_called_once()
+ send_email_smtp.assert_called_once()
+
+ self.assertEquals(send_email_smtp.call_args[0][0], self.RECIPIENTS)
+ self.assertEquals(
+ list(send_email_smtp.call_args[1]['images'].values())[0],
+ driver.screenshot.return_value,
+ )
+
+ @patch('superset.tasks.schedules.firefox.webdriver.WebDriver')
+ @patch('superset.tasks.schedules.send_email_smtp')
+ @patch('superset.tasks.schedules.time')
+ def test_deliver_email_options(self, mtime, send_email_smtp, driver_class):
+ element = Mock()
+ driver = Mock()
+ mtime.sleep.return_value = None
+
+ driver_class.return_value = driver
+
+ # Ensure that we are able to login with the driver
+ driver.find_elements_by_id.side_effect = [True, False]
+ driver.find_element_by_class_name.return_value = element
+ element.screenshot_as_png = read_fixture('sample.png')
+
+ schedule = db.session.query(DashboardEmailSchedule).filter_by(
+ id=self.dashboard_schedule).all()[0]
+
+ # Send individual mails to the group
+ schedule.deliver_as_group = False
+
+ # Set a bcc email address
+ app.config['EMAIL_REPORT_BCC_ADDRESS'] = self.BCC
+
+ deliver_dashboard(schedule)
+ mtime.sleep.assert_called_once()
+ driver.screenshot.assert_not_called()
+
+ self.assertEquals(send_email_smtp.call_count, 2)
+ self.assertEquals(send_email_smtp.call_args[1]['bcc'], self.BCC)
+
+ @patch('superset.tasks.schedules.firefox.webdriver.WebDriver')
+ @patch('superset.tasks.schedules.send_email_smtp')
+ @patch('superset.tasks.schedules.time')
+ def test_deliver_slice_inline_image(self, mtime, send_email_smtp, driver_class):
+ element = Mock()
+ driver = Mock()
+ mtime.sleep.return_value = None
+
+ driver_class.return_value = driver
+
+ # Ensure that we are able to login with the driver
+ driver.find_elements_by_id.side_effect = [True, False]
+ driver.find_element_by_class_name.return_value = element
+ element.screenshot_as_png = read_fixture('sample.png')
+
+ schedule = db.session.query(SliceEmailSchedule).filter_by(
+ id=self.slice_schedule).all()[0]
+
+ schedule.email_format = SliceEmailReportFormat.visualization
+ schedule.delivery_format = EmailDeliveryType.inline
+
+ deliver_slice(schedule)
+ mtime.sleep.assert_called_once()
+ driver.screenshot.assert_not_called()
+ send_email_smtp.assert_called_once()
+
+ self.assertEquals(
+ list(send_email_smtp.call_args[1]['images'].values())[0],
+ element.screenshot_as_png,
+ )
+
+ @patch('superset.tasks.schedules.firefox.webdriver.WebDriver')
+ @patch('superset.tasks.schedules.send_email_smtp')
+ @patch('superset.tasks.schedules.time')
+ def test_deliver_slice_attachment(self, mtime, send_email_smtp, driver_class):
+ element = Mock()
+ driver = Mock()
+ mtime.sleep.return_value = None
+
+ driver_class.return_value = driver
+
+ # Ensure that we are able to login with the driver
+ driver.find_elements_by_id.side_effect = [True, False]
+ driver.find_element_by_class_name.return_value = element
+ element.screenshot_as_png = read_fixture('sample.png')
+
+ schedule = db.session.query(SliceEmailSchedule).filter_by(
+ id=self.slice_schedule).all()[0]
+
+ schedule.email_format = SliceEmailReportFormat.visualization
+ schedule.delivery_type = EmailDeliveryType.attachment
+
+ deliver_slice(schedule)
+ mtime.sleep.assert_called_once()
+ driver.screenshot.assert_not_called()
+ send_email_smtp.assert_called_once()
+
+ self.assertEquals(
+ send_email_smtp.call_args[1]['data']['screenshot.png'],
+ element.screenshot_as_png,
+ )
+
+ @patch('superset.tasks.schedules.requests.get')
+ @patch('superset.tasks.schedules.send_email_smtp')
+ def test_deliver_slice_csv_attachment(self, send_email_smtp, get):
+ response = Mock()
+ get.return_value = response
+ response.raise_for_status.return_value = None
+ response.content = self.CSV
+
+ schedule = db.session.query(SliceEmailSchedule).filter_by(
+ id=self.slice_schedule).all()[0]
+
+ schedule.email_format = SliceEmailReportFormat.data
+ schedule.delivery_type = EmailDeliveryType.attachment
+
+ deliver_slice(schedule)
+ send_email_smtp.assert_called_once()
+
+ file_name = __('%(name)s.csv', name=schedule.slice.slice_name)
+
+ self.assertEquals(
+ send_email_smtp.call_args[1]['data'][file_name],
+ self.CSV,
+ )
+
+ @patch('superset.tasks.schedules.requests.get')
+ @patch('superset.tasks.schedules.send_email_smtp')
+ def test_deliver_slice_csv_inline(self, send_email_smtp, get):
+ response = Mock()
+ get.return_value = response
+ response.raise_for_status.return_value = None
+ response.content = self.CSV
+
+ schedule = db.session.query(SliceEmailSchedule).filter_by(
+ id=self.slice_schedule).all()[0]
+
+ schedule.email_format = SliceEmailReportFormat.data
+ schedule.delivery_type = EmailDeliveryType.inline
+
+ deliver_slice(schedule)
+ send_email_smtp.assert_called_once()
+
+ self.assertIsNone(send_email_smtp.call_args[1]['data'])
+ self.assertTrue('<table ' in send_email_smtp.call_args[0][2])
diff --git a/tests/utils.py b/tests/utils.py
index 59ec1d7..a54968f 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -4,6 +4,10 @@ from os import path
FIXTURES_DIR = 'tests/fixtures'
+def read_fixture(fixture_file_name):
+ with open(path.join(FIXTURES_DIR, fixture_file_name), 'rb') as fixture_file:
+ return fixture_file.read()
+
+
def load_fixture(fixture_file_name):
- with open(path.join(FIXTURES_DIR, fixture_file_name)) as fixture_file:
- return json.load(fixture_file)
+ return json.loads(read_fixture(fixture_file_name))