You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2016/06/21 06:48:37 UTC

incubator-airflow git commit: [AIRFLOW-263] Remove temp backtick file

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 7992036f5 -> f4c44146c


[AIRFLOW-263] Remove temp backtick file

Closes #1613 from artwr/artwr_remove_backtick_file

[AIRFLOW-263] Remove temp backtick file

This removes the file erroneously introduced by the highcharts
refactor.

Solves AIRFLOW-263


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f4c44146
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f4c44146
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f4c44146

Branch: refs/heads/master
Commit: f4c44146cd38afe21c517230bff71d33ab041246
Parents: 7992036
Author: Arthur Wiedmer <ar...@gmail.com>
Authored: Tue Jun 21 08:48:23 2016 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Jun 21 08:48:23 2016 +0200

----------------------------------------------------------------------
 ` | 2347 ----------------------------------------------------------------
 1 file changed, 2347 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f4c44146/`
----------------------------------------------------------------------
diff --git a/` b/`
deleted file mode 100644
index 6331805..0000000
--- a/`
+++ /dev/null
@@ -1,2347 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-import sys
-
-import os
-import socket
-import importlib
-
-from functools import wraps
-from datetime import datetime, timedelta
-import dateutil.parser
-import copy
-from itertools import chain, product
-
-from past.utils import old_div
-from past.builtins import basestring
-
-import inspect
-import traceback
-
-import sqlalchemy as sqla
-from sqlalchemy import or_, desc, and_
-
-
-from flask import redirect, url_for, request, Markup, Response, current_app, render_template
-from flask_admin import BaseView, expose, AdminIndexView
-from flask_admin.contrib.sqla import ModelView
-from flask_admin.actions import action
-from flask_login import flash
-from flask._compat import PY2
-
-import jinja2
-import markdown
-import json
-
-from wtforms import (
-    Form, SelectField, TextAreaField, PasswordField, StringField)
-
-from pygments import highlight, lexers
-from pygments.formatters import HtmlFormatter
-
-import airflow
-from airflow import configuration as conf
-from airflow import models
-from airflow import settings
-from airflow.exceptions import AirflowException
-from airflow.settings import Session
-from airflow.models import XCom
-
-from airflow.utils.json import json_ser
-from airflow.utils.state import State
-from airflow.utils.db import provide_session
-from airflow.utils.helpers import alchemy_to_dict
-from airflow.utils import logging as log_utils
-from airflow.www import utils as wwwutils
-from airflow.www.forms import DateTimeForm, DateTimeWithNumRunsForm
-
-QUERY_LIMIT = 100000
-CHART_LIMIT = 200000
-
-dagbag = models.DagBag(os.path.expanduser(conf.get('core', 'DAGS_FOLDER')))
-
-login_required = airflow.login.login_required
-current_user = airflow.login.current_user
-logout_user = airflow.login.logout_user
-
-FILTER_BY_OWNER = False
-if conf.getboolean('webserver', 'FILTER_BY_OWNER'):
-    # filter_by_owner if authentication is enabled and filter_by_owner is true
-    FILTER_BY_OWNER = not current_app.config['LOGIN_DISABLED']
-
-
-def dag_link(v, c, m, p):
-    url = url_for(
-        'airflow.graph',
-        dag_id=m.dag_id)
-    return Markup(
-        '<a href="{url}">{m.dag_id}</a>'.format(**locals()))
-
-
-def log_link(v, c, m, p):
-    url = url_for(
-        'airflow.log',
-        dag_id=m.dag_id,
-        task_id=m.task_id,
-        execution_date=m.execution_date.isoformat())
-    return Markup(
-        '<a href="{url}">'
-        '    <span class="glyphicon glyphicon-book" aria-hidden="true">'
-        '</span></a>').format(**locals())
-
-
-def task_instance_link(v, c, m, p):
-    url = url_for(
-        'airflow.task',
-        dag_id=m.dag_id,
-        task_id=m.task_id,
-        execution_date=m.execution_date.isoformat())
-    url_root = url_for(
-        'airflow.graph',
-        dag_id=m.dag_id,
-        root=m.task_id,
-        execution_date=m.execution_date.isoformat())
-    return Markup(
-        """
-        <span style="white-space: nowrap;">
-        <a href="{url}">{m.task_id}</a>
-        <a href="{url_root}" title="Filter on this task and upstream">
-        <span class="glyphicon glyphicon-filter" style="margin-left: 0px;"
-            aria-hidden="true"></span>
-        </a>
-        </span>
-        """.format(**locals()))
-
-
-def state_token(state):
-    color = State.color(state)
-    return Markup(
-        '<span class="label" style="background-color:{color};">'
-        '{state}</span>'.format(**locals()))
-
-
-def state_f(v, c, m, p):
-    return state_token(m.state)
-
-
-def duration_f(v, c, m, p):
-    if m.end_date and m.duration:
-        return timedelta(seconds=m.duration)
-
-
-def datetime_f(v, c, m, p):
-    attr = getattr(m, p)
-    dttm = attr.isoformat() if attr else ''
-    if datetime.now().isoformat()[:4] == dttm[:4]:
-        dttm = dttm[5:]
-    return Markup("<nobr>{}</nobr>".format(dttm))
-
-
-def nobr_f(v, c, m, p):
-    return Markup("<nobr>{}</nobr>".format(getattr(m, p)))
-
-
-def label_link(v, c, m, p):
-    try:
-        default_params = eval(m.default_params)
-    except:
-        default_params = {}
-    url = url_for(
-        'airflow.chart', chart_id=m.id, iteration_no=m.iteration_no,
-        **default_params)
-    return Markup("<a href='{url}'>{m.label}</a>".format(**locals()))
-
-
-def pool_link(v, c, m, p):
-    url = '/admin/taskinstance/?flt1_pool_equals=' + m.pool
-    return Markup("<a href='{url}'>{m.pool}</a>".format(**locals()))
-
-
-def pygment_html_render(s, lexer=lexers.TextLexer):
-    return highlight(
-        s,
-        lexer(),
-        HtmlFormatter(linenos=True),
-    )
-
-
-def render(obj, lexer):
-    out = ""
-    if isinstance(obj, basestring):
-        out += pygment_html_render(obj, lexer)
-    elif isinstance(obj, (tuple, list)):
-        for i, s in enumerate(obj):
-            out += "<div>List item #{}</div>".format(i)
-            out += "<div>" + pygment_html_render(s, lexer) + "</div>"
-    elif isinstance(obj, dict):
-        for k, v in obj.items():
-            out += '<div>Dict item "{}"</div>'.format(k)
-            out += "<div>" + pygment_html_render(v, lexer) + "</div>"
-    return out
-
-
-def wrapped_markdown(s):
-    return '<div class="rich_doc">' + markdown.markdown(s) + "</div>"
-
-
-attr_renderer = {
-    'bash_command': lambda x: render(x, lexers.BashLexer),
-    'hql': lambda x: render(x, lexers.SqlLexer),
-    'sql': lambda x: render(x, lexers.SqlLexer),
-    'doc': lambda x: render(x, lexers.TextLexer),
-    'doc_json': lambda x: render(x, lexers.JsonLexer),
-    'doc_rst': lambda x: render(x, lexers.RstLexer),
-    'doc_yaml': lambda x: render(x, lexers.YamlLexer),
-    'doc_md': wrapped_markdown,
-    'python_callable': lambda x: render(
-        inspect.getsource(x), lexers.PythonLexer),
-}
-
-
-def data_profiling_required(f):
-    '''
-    Decorator for views requiring data profiling access
-    '''
-    @wraps(f)
-    def decorated_function(*args, **kwargs):
-        if (
-                    current_app.config['LOGIN_DISABLED'] or
-                    (not current_user.is_anonymous() and current_user.data_profiling())
-        ):
-            return f(*args, **kwargs)
-        else:
-            flash("This page requires data profiling privileges", "error")
-            return redirect(url_for('admin.index'))
-    return decorated_function
-
-
-def fused_slots(v, c, m, p):
-    url = (
-        '/admin/taskinstance/' +
-        '?flt1_pool_equals=' + m.pool +
-        '&flt2_state_equals=running')
-    return Markup("<a href='{0}'>{1}</a>".format(url, m.used_slots()))
-
-
-def fqueued_slots(v, c, m, p):
-    url = (
-        '/admin/taskinstance/' +
-        '?flt1_pool_equals=' + m.pool +
-        '&flt2_state_equals=queued&sort=10&desc=1')
-    return Markup("<a href='{0}'>{1}</a>".format(url, m.queued_slots()))
-
-
-class Airflow(BaseView):
-
-    def is_visible(self):
-        return False
-
-    @expose('/')
-    @login_required
-    def index(self):
-        return self.render('airflow/dags.html')
-
-    @expose('/chart_data')
-    @data_profiling_required
-    @wwwutils.gzipped
-    # @cache.cached(timeout=3600, key_prefix=wwwutils.make_cache_key)
-    def chart_data(self):
-        session = settings.Session()
-        chart_id = request.args.get('chart_id')
-        csv = request.args.get('csv') == "true"
-        chart = session.query(models.Chart).filter_by(id=chart_id).first()
-        db = session.query(
-            models.Connection).filter_by(conn_id=chart.conn_id).first()
-        session.expunge_all()
-        session.commit()
-        session.close()
-
-        payload = {}
-        payload['state'] = 'ERROR'
-        payload['error'] = ''
-
-        # Processing templated fields
-        try:
-            args = eval(chart.default_params)
-            if type(args) is not type(dict()):
-                raise AirflowException('Not a dict')
-        except:
-            args = {}
-            payload['error'] += (
-                "Default params is not valid, string has to evaluate as "
-                "a Python dictionary. ")
-
-        request_dict = {k: request.args.get(k) for k in request.args}
-        from airflow import macros
-        args.update(request_dict)
-        args['macros'] = macros
-        sql = jinja2.Template(chart.sql).render(**args)
-        label = jinja2.Template(chart.label).render(**args)
-        payload['sql_html'] = Markup(highlight(
-            sql,
-            lexers.SqlLexer(),  # Lexer call
-            HtmlFormatter(noclasses=True))
-        )
-        payload['label'] = label
-
-        import pandas as pd
-        pd.set_option('display.max_colwidth', 100)
-        hook = db.get_hook()
-        try:
-            df = hook.get_pandas_df(wwwutils.limit_sql(sql, CHART_LIMIT, conn_type=db.conn_type))
-            df = df.fillna(0)
-        except Exception as e:
-            payload['error'] += "SQL execution failed. Details: " + str(e)
-
-        if csv:
-            return Response(
-                response=df.to_csv(index=False),
-                status=200,
-                mimetype="application/text")
-
-        if not payload['error'] and len(df) == CHART_LIMIT:
-            payload['warning'] = (
-                "Data has been truncated to {0}"
-                " rows. Expect incomplete results.").format(CHART_LIMIT)
-
-        if not payload['error'] and len(df) == 0:
-            payload['error'] += "Empty result set. "
-        elif (
-                            not payload['error'] and
-                                chart.sql_layout == 'series' and
-                            chart.chart_type != "datatable" and
-                        len(df.columns) < 3):
-            payload['error'] += "SQL needs to return at least 3 columns. "
-        elif (
-                        not payload['error'] and
-                            chart.sql_layout == 'columns'and
-                        len(df.columns) < 2):
-            payload['error'] += "SQL needs to return at least 2 columns. "
-        elif not payload['error']:
-            import numpy as np
-            chart_type = chart.chart_type
-
-            data = None
-            if chart_type == "datatable":
-                chart.show_datatable = True
-            if chart.show_datatable:
-                data = df.to_dict(orient="split")
-                data['columns'] = [{'title': c} for c in data['columns']]
-
-            # Trying to convert time to something Highcharts likes
-            x_col = 1 if chart.sql_layout == 'series' else 0
-            if chart.x_is_date:
-                try:
-                    # From string to datetime
-                    df[df.columns[x_col]] = pd.to_datetime(
-                        df[df.columns[x_col]])
-                except Exception as e:
-                    raise AirflowException(str(e))
-                df[df.columns[x_col]] = df[df.columns[x_col]].apply(
-                    lambda x: int(x.strftime("%s")) * 1000)
-
-            series = []
-            colorAxis = None
-            if chart_type == 'datatable':
-                payload['data'] = data
-                payload['state'] = 'SUCCESS'
-                return wwwutils.json_response(payload)
-
-            elif chart_type == 'para':
-                df.rename(columns={
-                    df.columns[0]: 'name',
-                    df.columns[1]: 'group',
-                }, inplace=True)
-                return Response(
-                    response=df.to_csv(index=False),
-                    status=200,
-                    mimetype="application/text")
-
-            elif chart_type == 'heatmap':
-                color_perc_lbound = float(
-                    request.args.get('color_perc_lbound', 0))
-                color_perc_rbound = float(
-                    request.args.get('color_perc_rbound', 1))
-                color_scheme = request.args.get('color_scheme', 'blue_red')
-
-                if color_scheme == 'blue_red':
-                    stops = [
-                        [color_perc_lbound, '#00D1C1'],
-                        [
-                            color_perc_lbound +
-                            ((color_perc_rbound - color_perc_lbound)/2),
-                            '#FFFFCC'
-                        ],
-                        [color_perc_rbound, '#FF5A5F']
-                    ]
-                elif color_scheme == 'blue_scale':
-                    stops = [
-                        [color_perc_lbound, '#FFFFFF'],
-                        [color_perc_rbound, '#2222FF']
-                    ]
-                elif color_scheme == 'fire':
-                    diff = float(color_perc_rbound - color_perc_lbound)
-                    stops = [
-                        [color_perc_lbound, '#FFFFFF'],
-                        [color_perc_lbound + 0.33*diff, '#FFFF00'],
-                        [color_perc_lbound + 0.66*diff, '#FF0000'],
-                        [color_perc_rbound, '#000000']
-                    ]
-                else:
-                    stops = [
-                        [color_perc_lbound, '#FFFFFF'],
-                        [
-                            color_perc_lbound +
-                            ((color_perc_rbound - color_perc_lbound)/2),
-                            '#888888'
-                        ],
-                        [color_perc_rbound, '#000000'],
-                    ]
-
-                xaxis_label = df.columns[1]
-                yaxis_label = df.columns[2]
-                data = []
-                for row in df.itertuples():
-                    data.append({
-                        'x': row[2],
-                        'y': row[3],
-                        'value': row[4],
-                    })
-                x_format = '{point.x:%Y-%m-%d}' \
-                    if chart.x_is_date else '{point.x}'
-                series.append({
-                    'data': data,
-                    'borderWidth': 0,
-                    'colsize': 24 * 36e5,
-                    'turboThreshold': sys.float_info.max,
-                    'tooltip': {
-                        'headerFormat': '',
-                        'pointFormat': (
-                            df.columns[1] + ': ' + x_format + '<br/>' +
-                            df.columns[2] + ': {point.y}<br/>' +
-                            df.columns[3] + ': <b>{point.value}</b>'
-                        ),
-                    },
-                })
-                colorAxis = {
-                    'stops': stops,
-                    'minColor': '#FFFFFF',
-                    'maxColor': '#000000',
-                    'min': 50,
-                    'max': 2200,
-                }
-            else:
-                if chart.sql_layout == 'series':
-                    # User provides columns (series, x, y)
-                    xaxis_label = df.columns[1]
-                    yaxis_label = df.columns[2]
-                    df[df.columns[2]] = df[df.columns[2]].astype(np.float)
-                    df = df.pivot_table(
-                        index=df.columns[1],
-                        columns=df.columns[0],
-                        values=df.columns[2], aggfunc=np.sum)
-                else:
-                    # User provides columns (x, y, metric1, metric2, ...)
-                    xaxis_label = df.columns[0]
-                    yaxis_label = 'y'
-                    df.index = df[df.columns[0]]
-                    df = df.sort(df.columns[0])
-                    del df[df.columns[0]]
-                    for col in df.columns:
-                        df[col] = df[col].astype(np.float)
-
-                for col in df.columns:
-                    series.append({
-                        'name': col,
-                        'data': [
-                            (k, df[col][k])
-                            for k in df[col].keys()
-                            if not np.isnan(df[col][k])]
-                    })
-                series = [serie for serie in sorted(
-                    series, key=lambda s: s['data'][0][1], reverse=True)]
-
-            if chart_type == "stacked_area":
-                stacking = "normal"
-                chart_type = 'area'
-            elif chart_type == "percent_area":
-                stacking = "percent"
-                chart_type = 'area'
-            else:
-                stacking = None
-            hc = {
-                'chart': {
-                    'type': chart_type
-                },
-                'plotOptions': {
-                    'series': {
-                        'marker': {
-                            'enabled': False
-                        }
-                    },
-                    'area': {'stacking': stacking},
-                },
-                'title': {'text': ''},
-                'xAxis': {
-                    'title': {'text': xaxis_label},
-                    'type': 'datetime' if chart.x_is_date else None,
-                },
-                'yAxis': {
-                    'title': {'text': yaxis_label},
-                },
-                'colorAxis': colorAxis,
-                'tooltip': {
-                    'useHTML': True,
-                    'backgroundColor': None,
-                    'borderWidth': 0,
-                },
-                'series': series,
-            }
-
-            if chart.y_log_scale:
-                hc['yAxis']['type'] = 'logarithmic'
-                hc['yAxis']['minorTickInterval'] = 0.1
-                if 'min' in hc['yAxis']:
-                    del hc['yAxis']['min']
-
-            payload['state'] = 'SUCCESS'
-            payload['hc'] = hc
-            payload['data'] = data
-            payload['request_dict'] = request_dict
-        return wwwutils.json_response(payload)
-
-    @expose('/chart')
-    @data_profiling_required
-    def chart(self):
-        session = settings.Session()
-        chart_id = request.args.get('chart_id')
-        embed = request.args.get('embed')
-        chart = session.query(models.Chart).filter_by(id=chart_id).first()
-        session.expunge_all()
-        session.commit()
-        session.close()
-        if chart.chart_type == 'para':
-            return self.render('airflow/para/para.html', chart=chart)
-
-        sql = ""
-        if chart.show_sql:
-            sql = Markup(highlight(
-                chart.sql,
-                lexers.SqlLexer(),  # Lexer call
-                HtmlFormatter(noclasses=True))
-            )
-        return self.render(
-            'airflow/highchart.html',
-            chart=chart,
-            title="Airflow - Chart",
-            sql=sql,
-            label=chart.label,
-            embed=embed)
-
-    @expose('/dag_stats')
-    #@login_required
-    def dag_stats(self):
-        states = [
-            State.SUCCESS,
-            State.RUNNING,
-            State.FAILED,
-            State.UPSTREAM_FAILED,
-            State.UP_FOR_RETRY,
-            State.QUEUED,
-        ]
-        task_ids = []
-        dag_ids = []
-        for dag in dagbag.dags.values():
-            task_ids += dag.task_ids
-            if not dag.is_subdag:
-                dag_ids.append(dag.dag_id)
-
-        TI = models.TaskInstance
-        DagRun = models.DagRun
-        session = Session()
-
-        LastDagRun = (
-            session.query(DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date'))
-            .group_by(DagRun.dag_id)
-            .subquery('last_dag_run')
-        )
-        RunningDagRun = (
-            session.query(DagRun.dag_id, DagRun.execution_date)
-            .filter(DagRun.state == State.RUNNING)
-            .subquery('running_dag_run')
-        )
-
-        # Select all task_instances from active dag_runs.
-        # If no dag_run is active, return task instances from most recent dag_run.
-        qry = (
-            session.query(TI.dag_id, TI.state, sqla.func.count(TI.task_id))
-            .outerjoin(RunningDagRun, and_(
-                RunningDagRun.c.dag_id == TI.dag_id,
-                RunningDagRun.c.execution_date == TI.execution_date)
-            )
-            .outerjoin(LastDagRun, and_(
-                LastDagRun.c.dag_id == TI.dag_id,
-                LastDagRun.c.execution_date == TI.execution_date)
-            )
-            .filter(TI.task_id.in_(task_ids))
-            .filter(TI.dag_id.in_(dag_ids))
-            .filter(or_(
-                RunningDagRun.c.dag_id != None,
-                LastDagRun.c.dag_id != None
-            ))
-            .group_by(TI.dag_id, TI.state)
-        )
-
-        data = {}
-        for dag_id, state, count in qry:
-            if dag_id not in data:
-                data[dag_id] = {}
-            data[dag_id][state] = count
-        session.commit()
-        session.close()
-
-        payload = {}
-        for dag in dagbag.dags.values():
-            payload[dag.safe_dag_id] = []
-            for state in states:
-                try:
-                    count = data[dag.dag_id][state]
-                except:
-                    count = 0
-                d = {
-                    'state': state,
-                    'count': count,
-                    'dag_id': dag.dag_id,
-                    'color': State.color(state)
-                }
-                payload[dag.safe_dag_id].append(d)
-        return wwwutils.json_response(payload)
-
-
-    @expose('/code')
-    @login_required
-    def code(self):
-        dag_id = request.args.get('dag_id')
-        dag = dagbag.get_dag(dag_id)
-        title = dag_id
-        try:
-            m = importlib.import_module(dag.module_name)
-            code = inspect.getsource(m)
-            html_code = highlight(
-                code, lexers.PythonLexer(), HtmlFormatter(linenos=True))
-        except IOError as e:
-            html_code = str(e)
-
-        return self.render(
-            'airflow/dag_code.html', html_code=html_code, dag=dag, title=title,
-            root=request.args.get('root'),
-            demo_mode=conf.getboolean('webserver', 'demo_mode'))
-
-    @expose('/dag_details')
-    @login_required
-    def dag_details(self):
-        dag_id = request.args.get('dag_id')
-        dag = dagbag.get_dag(dag_id)
-        title = "DAG details"
-
-        session = settings.Session()
-        TI = models.TaskInstance
-        states = (
-            session.query(TI.state, sqla.func.count(TI.dag_id))
-            .filter(TI.dag_id == dag_id)
-            .group_by(TI.state)
-            .all()
-        )
-        return self.render(
-            'airflow/dag_details.html',
-            dag=dag, title=title, states=states, State=State)
-
-    @current_app.errorhandler(404)
-    def circles(self):
-        return render_template(
-            'airflow/circles.html', hostname=socket.gethostname()), 404
-
-    @current_app.errorhandler(500)
-    def show_traceback(self):
-        from airflow.utils import asciiart as ascii_
-        return render_template(
-            'airflow/traceback.html',
-            hostname=socket.gethostname(),
-            nukular=ascii_.nukular,
-            info=traceback.format_exc()), 500
-
-    @expose('/sandbox')
-    @login_required
-    def sandbox(self):
-        title = "Sandbox Suggested Configuration"
-        cfg_loc = conf.AIRFLOW_CONFIG + '.sandbox'
-        f = open(cfg_loc, 'r')
-        config = f.read()
-        f.close()
-        code_html = Markup(highlight(
-            config,
-            lexers.IniLexer(),  # Lexer call
-            HtmlFormatter(noclasses=True))
-        )
-        return self.render(
-            'airflow/code.html',
-            code_html=code_html, title=title, subtitle=cfg_loc)
-
-    @expose('/noaccess')
-    def noaccess(self):
-        return self.render('airflow/noaccess.html')
-
-    @expose('/headers')
-    def headers(self):
-        d = {
-            'headers': {k: v for k, v in request.headers},
-        }
-        if hasattr(current_user, 'is_superuser'):
-            d['is_superuser'] = current_user.is_superuser()
-            d['data_profiling'] = current_user.data_profiling()
-            d['is_anonymous'] = current_user.is_anonymous()
-            d['is_authenticated'] = current_user.is_authenticated()
-        if hasattr(current_user, 'username'):
-            d['username'] = current_user.username
-        return wwwutils.json_response(d)
-
-    @expose('/pickle_info')
-    def pickle_info(self):
-        d = {}
-        dag_id = request.args.get('dag_id')
-        dags = [dagbag.dags.get(dag_id)] if dag_id else dagbag.dags.values()
-        for dag in dags:
-            if not dag.is_subdag:
-                d[dag.dag_id] = dag.pickle_info()
-        return wwwutils.json_response(d)
-
-    @expose('/login', methods=['GET', 'POST'])
-    def login(self):
-        return airflow.login.login(self, request)
-
-    @expose('/logout')
-    def logout(self):
-        logout_user()
-        flash('You have been logged out.')
-        return redirect(url_for('admin.index'))
-
-    @expose('/rendered')
-    @login_required
-    @wwwutils.action_logging
-    def rendered(self):
-        dag_id = request.args.get('dag_id')
-        task_id = request.args.get('task_id')
-        execution_date = request.args.get('execution_date')
-        dttm = dateutil.parser.parse(execution_date)
-        form = DateTimeForm(data={'execution_date': dttm})
-        dag = dagbag.get_dag(dag_id)
-        task = copy.copy(dag.get_task(task_id))
-        ti = models.TaskInstance(task=task, execution_date=dttm)
-        try:
-            ti.render_templates()
-        except Exception as e:
-            flash("Error rendering template: " + str(e), "error")
-        title = "Rendered Template"
-        html_dict = {}
-        for template_field in task.__class__.template_fields:
-            content = getattr(task, template_field)
-            if template_field in attr_renderer:
-                html_dict[template_field] = attr_renderer[template_field](content)
-            else:
-                html_dict[template_field] = (
-                    "<pre><code>" + str(content) + "</pre></code>")
-
-        return self.render(
-            'airflow/ti_code.html',
-            html_dict=html_dict,
-            dag=dag,
-            task_id=task_id,
-            execution_date=execution_date,
-            form=form,
-            title=title,)
-
-    @expose('/log')
-    @login_required
-    @wwwutils.action_logging
-    def log(self):
-        BASE_LOG_FOLDER = os.path.expanduser(
-            conf.get('core', 'BASE_LOG_FOLDER'))
-        dag_id = request.args.get('dag_id')
-        task_id = request.args.get('task_id')
-        execution_date = request.args.get('execution_date')
-        dag = dagbag.get_dag(dag_id)
-        log_relative = "{dag_id}/{task_id}/{execution_date}".format(
-            **locals())
-        loc = os.path.join(BASE_LOG_FOLDER, log_relative)
-        loc = loc.format(**locals())
-        log = ""
-        TI = models.TaskInstance
-        session = Session()
-        dttm = dateutil.parser.parse(execution_date)
-        ti = session.query(TI).filter(
-            TI.dag_id == dag_id, TI.task_id == task_id,
-            TI.execution_date == dttm).first()
-        dttm = dateutil.parser.parse(execution_date)
-        form = DateTimeForm(data={'execution_date': dttm})
-
-        if ti:
-            host = ti.hostname
-            log_loaded = False
-
-            if socket.gethostname() == host:
-                try:
-                    f = open(loc)
-                    log += "".join(f.readlines())
-                    f.close()
-                    log_loaded = True
-                except:
-                    log = "*** Local log file not found.\n".format(loc)
-            else:
-                WORKER_LOG_SERVER_PORT = \
-                    conf.get('celery', 'WORKER_LOG_SERVER_PORT')
-                url = os.path.join(
-                    "http://{host}:{WORKER_LOG_SERVER_PORT}/log", log_relative
-                    ).format(**locals())
-                log += "*** Log file isn't local.\n"
-                log += "*** Fetching here: {url}\n".format(**locals())
-                try:
-                    import requests
-                    log += '\n' + requests.get(url).text
-                    log_loaded = True
-                except:
-                    log += "*** Failed to fetch log file from worker.\n".format(
-                        **locals())
-
-            if not log_loaded:
-                # load remote logs
-                remote_log_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
-                remote_log = os.path.join(remote_log_base, log_relative)
-                log += '\n*** Reading remote logs...\n'
-
-                # S3
-                if remote_log.startswith('s3:/'):
-                    log += log_utils.S3Log().read(remote_log, return_error=True)
-
-                # GCS
-                elif remote_log.startswith('gs:/'):
-                    log += log_utils.GCSLog().read(remote_log, return_error=True)
-
-                # unsupported
-                elif remote_log:
-                    log += '*** Unsupported remote log location.'
-
-            session.commit()
-            session.close()
-
-        if PY2 and not isinstance(log, unicode):
-            log = log.decode('utf-8')
-
-        title = "Log"
-
-        return self.render(
-            'airflow/ti_code.html',
-            code=log, dag=dag, title=title, task_id=task_id,
-            execution_date=execution_date, form=form)
-
-    @expose('/task')
-    @login_required
-    @wwwutils.action_logging
-    def task(self):
-        dag_id = request.args.get('dag_id')
-        task_id = request.args.get('task_id')
-        # Carrying execution_date through, even though it's irrelevant for
-        # this context
-        execution_date = request.args.get('execution_date')
-        dttm = dateutil.parser.parse(execution_date)
-        form = DateTimeForm(data={'execution_date': dttm})
-        dag = dagbag.get_dag(dag_id)
-        if not dag or task_id not in dag.task_ids:
-            flash(
-                "Task [{}.{}] doesn't seem to exist"
-                " at the moment".format(dag_id, task_id),
-                "error")
-            return redirect('/admin/')
-        task = dag.get_task(task_id)
-        task = copy.copy(task)
-        task.resolve_template_files()
-
-        attributes = []
-        for attr_name in dir(task):
-            if not attr_name.startswith('_'):
-                attr = getattr(task, attr_name)
-                if type(attr) != type(self.task) and \
-                                attr_name not in attr_renderer:
-                    attributes.append((attr_name, str(attr)))
-
-        title = "Task Details"
-        # Color coding the special attributes that are code
-        special_attrs_rendered = {}
-        for attr_name in attr_renderer:
-            if hasattr(task, attr_name):
-                source = getattr(task, attr_name)
-                special_attrs_rendered[attr_name] = attr_renderer[attr_name](source)
-
-        return self.render(
-            'airflow/task.html',
-            attributes=attributes,
-            task_id=task_id,
-            execution_date=execution_date,
-            special_attrs_rendered=special_attrs_rendered,
-            form=form,
-            dag=dag, title=title)
-
-    @expose('/xcom')
-    @login_required
-    @wwwutils.action_logging
-    def xcom(self):
-        dag_id = request.args.get('dag_id')
-        task_id = request.args.get('task_id')
-        # Carrying execution_date through, even though it's irrelevant for
-        # this context
-        execution_date = request.args.get('execution_date')
-        dttm = dateutil.parser.parse(execution_date)
-        form = DateTimeForm(data={'execution_date': dttm})
-        dag = dagbag.get_dag(dag_id)
-        if not dag or task_id not in dag.task_ids:
-            flash(
-                "Task [{}.{}] doesn't seem to exist"
-                " at the moment".format(dag_id, task_id),
-                "error")
-            return redirect('/admin/')
-
-        session = Session()
-        xcomlist = session.query(XCom).filter(
-            XCom.dag_id == dag_id, XCom.task_id == task_id,
-            XCom.execution_date == dttm).all()
-
-        attributes = []
-        for xcom in xcomlist:
-            if not xcom.key.startswith('_'):
-                attributes.append((xcom.key, xcom.value))
-
-        title = "XCom"
-        return self.render(
-            'airflow/xcom.html',
-            attributes=attributes,
-            task_id=task_id,
-            execution_date=execution_date,
-            form=form,
-            dag=dag, title=title)\
-
-    @expose('/run')
-    @login_required
-    @wwwutils.action_logging
-    @wwwutils.notify_owner
-    def run(self):
-        dag_id = request.args.get('dag_id')
-        task_id = request.args.get('task_id')
-        origin = request.args.get('origin')
-        dag = dagbag.get_dag(dag_id)
-        task = dag.get_task(task_id)
-
-        execution_date = request.args.get('execution_date')
-        execution_date = dateutil.parser.parse(execution_date)
-        force = request.args.get('force') == "true"
-        deps = request.args.get('deps') == "true"
-
-        try:
-            from airflow.executors import DEFAULT_EXECUTOR as executor
-            from airflow.executors import CeleryExecutor
-            if not isinstance(executor, CeleryExecutor):
-                flash("Only works with the CeleryExecutor, sorry", "error")
-                return redirect(origin)
-        except ImportError:
-            # in case CeleryExecutor cannot be imported it is not active either
-            flash("Only works with the CeleryExecutor, sorry", "error")
-            return redirect(origin)
-
-        ti = models.TaskInstance(task=task, execution_date=execution_date)
-        executor.start()
-        executor.queue_task_instance(
-            ti, force=force, ignore_dependencies=deps)
-        executor.heartbeat()
-        flash(
-            "Sent {} to the message queue, "
-            "it should start any moment now.".format(ti))
-        return redirect(origin)
-
-    @expose('/clear')
-    @login_required
-    @wwwutils.action_logging
-    @wwwutils.notify_owner
-    def clear(self):
-        dag_id = request.args.get('dag_id')
-        task_id = request.args.get('task_id')
-        origin = request.args.get('origin')
-        dag = dagbag.get_dag(dag_id)
-        task = dag.get_task(task_id)
-
-        execution_date = request.args.get('execution_date')
-        execution_date = dateutil.parser.parse(execution_date)
-        confirmed = request.args.get('confirmed') == "true"
-        upstream = request.args.get('upstream') == "true"
-        downstream = request.args.get('downstream') == "true"
-        future = request.args.get('future') == "true"
-        past = request.args.get('past') == "true"
-
-        dag = dag.sub_dag(
-            task_regex=r"^{0}$".format(task_id),
-            include_downstream=downstream,
-            include_upstream=upstream)
-
-        end_date = execution_date if not future else None
-        start_date = execution_date if not past else None
-        if confirmed:
-            count = dag.clear(
-                start_date=start_date,
-                end_date=end_date)
-
-            flash("{0} task instances have been cleared".format(count))
-            return redirect(origin)
-        else:
-            tis = dag.clear(
-                start_date=start_date,
-                end_date=end_date,
-                dry_run=True)
-            if not tis:
-                flash("No task instances to clear", 'error')
-                response = redirect(origin)
-            else:
-                details = "\n".join([str(t) for t in tis])
-
-                response = self.render(
-                    'airflow/confirm.html',
-                    message=(
-                        "Here's the list of task instances you are about "
-                        "to clear:"),
-                    details=details,)
-
-            return response
-
-    @expose('/blocked')
-    @login_required
-    def blocked(self):
-        session = settings.Session()
-        DR = models.DagRun
-        dags = (
-            session.query(DR.dag_id, sqla.func.count(DR.id))
-            .filter(DR.state == State.RUNNING)
-            .group_by(DR.dag_id)
-            .all()
-        )
-        payload = []
-        for dag_id, active_dag_runs in dags:
-            max_active_runs = 0
-            if dag_id in dagbag.dags:
-                max_active_runs = dagbag.dags[dag_id].max_active_runs
-            payload.append({
-                'dag_id': dag_id,
-                'active_dag_run': active_dag_runs,
-                'max_active_runs': max_active_runs,
-            })
-        return wwwutils.json_response(payload)
-
-    @expose('/success')
-    @login_required
-    @wwwutils.action_logging
-    @wwwutils.notify_owner
-    def success(self):
-        dag_id = request.args.get('dag_id')
-        task_id = request.args.get('task_id')
-        origin = request.args.get('origin')
-        dag = dagbag.get_dag(dag_id)
-        task = dag.get_task(task_id)
-
-        execution_date = request.args.get('execution_date')
-        execution_date = dateutil.parser.parse(execution_date)
-        confirmed = request.args.get('confirmed') == "true"
-        upstream = request.args.get('upstream') == "true"
-        downstream = request.args.get('downstream') == "true"
-        future = request.args.get('future') == "true"
-        past = request.args.get('past') == "true"
-        MAX_PERIODS = 1000
-
-        # Flagging tasks as successful
-        session = settings.Session()
-        task_ids = [task_id]
-        end_date = ((dag.latest_execution_date or datetime.now())
-                    if future else execution_date)
-
-        if 'start_date' in dag.default_args:
-            start_date = dag.default_args['start_date']
-        elif dag.start_date:
-            start_date = dag.start_date
-        else:
-            start_date = execution_date
-
-        start_date = execution_date if not past else start_date
-
-        if downstream:
-            task_ids += [
-                t.task_id
-                for t in task.get_flat_relatives(upstream=False)]
-        if upstream:
-            task_ids += [
-                t.task_id
-                for t in task.get_flat_relatives(upstream=True)]
-        TI = models.TaskInstance
-
-        if dag.schedule_interval == '@once':
-            dates = [start_date]
-        else:
-            dates = dag.date_range(start_date, end_date=end_date)
-
-        tis = session.query(TI).filter(
-            TI.dag_id == dag_id,
-            TI.execution_date.in_(dates),
-            TI.task_id.in_(task_ids)).all()
-        tis_to_change = session.query(TI).filter(
-            TI.dag_id == dag_id,
-            TI.execution_date.in_(dates),
-            TI.task_id.in_(task_ids),
-            TI.state != State.SUCCESS).all()
-        tasks = list(product(task_ids, dates))
-        tis_to_create = list(
-            set(tasks) -
-            set([(ti.task_id, ti.execution_date) for ti in tis]))
-
-        tis_all_altered = list(chain(
-            [(ti.task_id, ti.execution_date) for ti in tis_to_change],
-            tis_to_create))
-
-        if len(tis_all_altered) > MAX_PERIODS:
-            flash("Too many tasks at once (>{0})".format(
-                MAX_PERIODS), 'error')
-            return redirect(origin)
-
-        if confirmed:
-            for ti in tis_to_change:
-                ti.state = State.SUCCESS
-            session.commit()
-
-            for task_id, task_execution_date in tis_to_create:
-                ti = TI(
-                    task=dag.get_task(task_id),
-                    execution_date=task_execution_date,
-                    state=State.SUCCESS)
-                session.add(ti)
-                session.commit()
-
-            session.commit()
-            session.close()
-            flash("Marked success on {} task instances".format(
-                len(tis_all_altered)))
-
-            return redirect(origin)
-        else:
-            if not tis_all_altered:
-                flash("No task instances to mark as successful", 'error')
-                response = redirect(origin)
-            else:
-                tis = []
-                for task_id, task_execution_date in tis_all_altered:
-                    tis.append(TI(
-                        task=dag.get_task(task_id),
-                        execution_date=task_execution_date,
-                        state=State.SUCCESS))
-                details = "\n".join([str(t) for t in tis])
-
-                response = self.render(
-                    'airflow/confirm.html',
-                    message=(
-                        "Here's the list of task instances you are about "
-                        "to mark as successful:"),
-                    details=details,)
-            return response
-
-    @expose('/tree')
-    @login_required
-    @wwwutils.gzipped
-    @wwwutils.action_logging
-    def tree(self):
-        dag_id = request.args.get('dag_id')
-        blur = conf.getboolean('webserver', 'demo_mode')
-        dag = dagbag.get_dag(dag_id)
-        root = request.args.get('root')
-        if root:
-            dag = dag.sub_dag(
-                task_regex=root,
-                include_downstream=False,
-                include_upstream=True)
-
-        session = settings.Session()
-
-        base_date = request.args.get('base_date')
-        num_runs = request.args.get('num_runs')
-        num_runs = int(num_runs) if num_runs else 25
-
-        if base_date:
-            base_date = dateutil.parser.parse(base_date)
-        else:
-            base_date = dag.latest_execution_date or datetime.now()
-
-        dates = dag.date_range(base_date, num=-abs(num_runs))
-        min_date = dates[0] if dates else datetime(2000, 1, 1)
-
-        DR = models.DagRun
-        dag_runs = (
-            session.query(DR)
-            .filter(
-                DR.dag_id==dag.dag_id,
-                DR.execution_date<=base_date,
-                DR.execution_date>=min_date)
-            .all()
-        )
-        dag_runs = {
-            dr.execution_date: alchemy_to_dict(dr) for dr in dag_runs}
-
-        tis = dag.get_task_instances(
-                session, start_date=min_date, end_date=base_date)
-        dates = sorted(list({ti.execution_date for ti in tis}))
-        max_date = max([ti.execution_date for ti in tis]) if dates else None
-        task_instances = {}
-        for ti in tis:
-            tid = alchemy_to_dict(ti)
-            dr = dag_runs.get(ti.execution_date)
-            tid['external_trigger'] = dr['external_trigger'] if dr else False
-            task_instances[(ti.task_id, ti.execution_date)] = tid
-
-        expanded = []
-        # The default recursion traces every path so that tree view has full
-        # expand/collapse functionality. After 5,000 nodes we stop and fall
-        # back on a quick DFS search for performance. See PR #320.
-        node_count = [0]
-        node_limit = 5000 / max(1, len(dag.roots))
-
-        def recurse_nodes(task, visited):
-            visited.add(task)
-            node_count[0] += 1
-
-            children = [
-                recurse_nodes(t, visited) for t in task.upstream_list
-                if node_count[0] < node_limit or t not in visited]
-
-            # D3 tree uses children vs _children to define what is
-            # expanded or not. The following block makes it such that
-            # repeated nodes are collapsed by default.
-            children_key = 'children'
-            if task.task_id not in expanded:
-                expanded.append(task.task_id)
-            elif children:
-                children_key = "_children"
-
-            return {
-                'name': task.task_id,
-                'instances': [
-                        task_instances.get((task.task_id, d)) or {
-                            'execution_date': d.isoformat(),
-                            'task_id': task.task_id
-                        }
-                    for d in dates],
-                children_key: children,
-                'num_dep': len(task.upstream_list),
-                'operator': task.task_type,
-                'retries': task.retries,
-                'owner': task.owner,
-                'start_date': task.start_date,
-                'end_date': task.end_date,
-                'depends_on_past': task.depends_on_past,
-                'ui_color': task.ui_color,
-            }
-        data = {
-            'name': '[DAG]',
-            'children': [recurse_nodes(t, set()) for t in dag.roots],
-            'instances': [
-                dag_runs.get(d) or {'execution_date': d.isoformat()}
-                for d in dates],
-        }
-
-        data = json.dumps(data, indent=4, default=json_ser)
-        session.commit()
-        session.close()
-
-        form = DateTimeWithNumRunsForm(data={'base_date': max_date,
-                                             'num_runs': num_runs})
-        return self.render(
-            'airflow/tree.html',
-            operators=sorted(
-                list(set([op.__class__ for op in dag.tasks])),
-                key=lambda x: x.__name__
-            ),
-            root=root,
-            form=form,
-            dag=dag, data=data, blur=blur)
-
-    @expose('/graph')
-    @login_required
-    @wwwutils.gzipped
-    @wwwutils.action_logging
-    def graph(self):
-        session = settings.Session()
-        dag_id = request.args.get('dag_id')
-        blur = conf.getboolean('webserver', 'demo_mode')
-        arrange = request.args.get('arrange', "LR")
-        dag = dagbag.get_dag(dag_id)
-        if dag_id not in dagbag.dags:
-            flash('DAG "{0}" seems to be missing.'.format(dag_id), "error")
-            return redirect('/admin/')
-
-        root = request.args.get('root')
-        if root:
-            dag = dag.sub_dag(
-                task_regex=root,
-                include_upstream=True,
-                include_downstream=False)
-
-        nodes = []
-        edges = []
-        for task in dag.tasks:
-            nodes.append({
-                'id': task.task_id,
-                'value': {
-                    'label': task.task_id,
-                    'labelStyle': "fill:{0};".format(task.ui_fgcolor),
-                    'style': "fill:{0};".format(task.ui_color),
-                }
-            })
-
-        def get_upstream(task):
-            for t in task.upstream_list:
-                edge = {
-                    'u': t.task_id,
-                    'v': task.task_id,
-                }
-                if edge not in edges:
-                    edges.append(edge)
-                    get_upstream(t)
-
-        for t in dag.roots:
-            get_upstream(t)
-
-        dttm = request.args.get('execution_date')
-        if dttm:
-            dttm = dateutil.parser.parse(dttm)
-        else:
-            dttm = dag.latest_execution_date or datetime.now().date()
-
-        DR = models.DagRun
-        drs = (
-            session.query(DR)
-            .filter_by(dag_id=dag_id)
-            .order_by(desc(DR.execution_date)).all()
-        )
-        dr_choices = []
-        dr_state = None
-        for dr in drs:
-            dr_choices.append((dr.execution_date.isoformat(), dr.run_id))
-            if dttm == dr.execution_date:
-                dr_state = dr.state
-
-        class GraphForm(Form):
-            execution_date = SelectField("DAG run", choices=dr_choices)
-            arrange = SelectField("Layout", choices=(
-                ('LR', "Left->Right"),
-                ('RL', "Right->Left"),
-                ('TB', "Top->Bottom"),
-                ('BT', "Bottom->Top"),
-            ))
-        form = GraphForm(
-            data={'execution_date': dttm.isoformat(), 'arrange': arrange})
-
-        task_instances = {
-            ti.task_id: alchemy_to_dict(ti)
-            for ti in dag.get_task_instances(session, dttm, dttm)}
-        tasks = {
-            t.task_id: {
-                'dag_id': t.dag_id,
-                'task_type': t.task_type,
-            }
-            for t in dag.tasks}
-        if not tasks:
-            flash("No tasks found", "error")
-        session.commit()
-        session.close()
-        doc_md = markdown.markdown(dag.doc_md) if hasattr(dag, 'doc_md') else ''
-
-        return self.render(
-            'airflow/graph.html',
-            dag=dag,
-            form=form,
-            width=request.args.get('width', "100%"),
-            height=request.args.get('height', "800"),
-            execution_date=dttm.isoformat(),
-            state_token=state_token(dr_state),
-            doc_md=doc_md,
-            arrange=arrange,
-            operators=sorted(
-                list(set([op.__class__ for op in dag.tasks])),
-                key=lambda x: x.__name__
-            ),
-            blur=blur,
-            root=root or '',
-            task_instances=json.dumps(task_instances, indent=2),
-            tasks=json.dumps(tasks, indent=2),
-            nodes=json.dumps(nodes, indent=2),
-            edges=json.dumps(edges, indent=2),)
-
-    @expose('/duration')
-    @login_required
-    @wwwutils.action_logging
-    def duration(self):
-        from nvd3 import lineChart
-        import time
-        session = settings.Session()
-        dag_id = request.args.get('dag_id')
-        dag = dagbag.get_dag(dag_id)
-        base_date = request.args.get('base_date')
-        num_runs = request.args.get('num_runs')
-        num_runs = int(num_runs) if num_runs else 25
-
-        if base_date:
-            base_date = dateutil.parser.parse(base_date)
-        else:
-            base_date = dag.latest_execution_date or datetime.now()
-
-        dates = dag.date_range(base_date, num=-abs(num_runs))
-        min_date = dates[0] if dates else datetime(2000, 1, 1)
-
-        root = request.args.get('root')
-        if root:
-            dag = dag.sub_dag(
-                task_regex=root,
-                include_upstream=True,
-                include_downstream=False)
-
-        chart = lineChart(name="lineChart", x_is_date=True, height=750, width=600)
-        for task in dag.tasks:
-            y = []
-            x = []
-            for ti in task.get_task_instances(session, start_date=min_date,
-                                              end_date=base_date):
-                if ti.duration:
-                    dttm = int(time.mktime(ti.execution_date.timetuple())) * 1000
-                    x.append(dttm)
-                    y.append(float(ti.duration) / (60*60))
-            if x:
-                chart.add_serie(name=task.task_id, x=x, y=y)
-
-        tis = dag.get_task_instances(
-                session, start_date=min_date, end_date=base_date)
-        dates = sorted(list({ti.execution_date for ti in tis}))
-        max_date = max([ti.execution_date for ti in tis]) if dates else None
-
-        session.commit()
-        session.close()
-
-        form = DateTimeWithNumRunsForm(data={'base_date': max_date,
-                                             'num_runs': num_runs})
-        chart.buildhtml()
-        return self.render(
-            'airflow/chart.html',
-            dag=dag,
-            demo_mode=conf.getboolean('webserver', 'demo_mode'),
-            root=root,
-            form=form,
-            chart=chart,
-        )
-
-    @expose('/landing_times')
-    @login_required
-    @wwwutils.action_logging
-    def landing_times(self):
-        session = settings.Session()
-        dag_id = request.args.get('dag_id')
-        dag = dagbag.get_dag(dag_id)
-        base_date = request.args.get('base_date')
-        num_runs = request.args.get('num_runs')
-        num_runs = int(num_runs) if num_runs else 25
-
-        if base_date:
-            base_date = dateutil.parser.parse(base_date)
-        else:
-            base_date = dag.latest_execution_date or datetime.now()
-
-        dates = dag.date_range(base_date, num=-abs(num_runs))
-        min_date = dates[0] if dates else datetime(2000, 1, 1)
-
-        root = request.args.get('root')
-        if root:
-            dag = dag.sub_dag(
-                task_regex=root,
-                include_upstream=True,
-                include_downstream=False)
-
-        all_data = []
-        for task in dag.tasks:
-            data = []
-            for ti in task.get_task_instances(session, start_date=min_date,
-                                              end_date=base_date):
-                if ti.end_date:
-                    ts = ti.execution_date
-                    if dag.schedule_interval:
-                        ts = dag.following_schedule(ts)
-                    secs = old_div((ti.end_date - ts).total_seconds(), 60*60)
-                    data.append([ti.execution_date.isoformat(), secs])
-            all_data.append({'data': data, 'name': task.task_id})
-
-        tis = dag.get_task_instances(
-                session, start_date=min_date, end_date=base_date)
-        dates = sorted(list({ti.execution_date for ti in tis}))
-        max_date = max([ti.execution_date for ti in tis]) if dates else None
-
-        session.commit()
-        session.close()
-
-        form = DateTimeWithNumRunsForm(data={'base_date': max_date,
-                                             'num_runs': num_runs})
-        return self.render(
-            'airflow/chart.html',
-            dag=dag,
-            data=json.dumps(all_data),
-            height="700px",
-            chart_options={'yAxis': {'title': {'text': 'hours after 00:00'}}},
-            demo_mode=conf.getboolean('webserver', 'demo_mode'),
-            root=root,
-            form=form,
-        )
-
-    @expose('/paused')
-    @login_required
-    @wwwutils.action_logging
-    def paused(self):
-        DagModel = models.DagModel
-        dag_id = request.args.get('dag_id')
-        session = settings.Session()
-        orm_dag = session.query(
-            DagModel).filter(DagModel.dag_id == dag_id).first()
-        if request.args.get('is_paused') == 'false':
-            orm_dag.is_paused = True
-        else:
-            orm_dag.is_paused = False
-        session.merge(orm_dag)
-        session.commit()
-        session.close()
-
-        dagbag.get_dag(dag_id)
-        return "OK"
-
-    @expose('/refresh')
-    @login_required
-    @wwwutils.action_logging
-    def refresh(self):
-        DagModel = models.DagModel
-        dag_id = request.args.get('dag_id')
-        session = settings.Session()
-        orm_dag = session.query(
-            DagModel).filter(DagModel.dag_id == dag_id).first()
-
-        if orm_dag:
-            orm_dag.last_expired = datetime.now()
-            session.merge(orm_dag)
-        session.commit()
-        session.close()
-
-        dagbag.get_dag(dag_id)
-        flash("DAG [{}] is now fresh as a daisy".format(dag_id))
-        return redirect('/')
-
-    @expose('/refresh_all')
-    @login_required
-    @wwwutils.action_logging
-    def refresh_all(self):
-        dagbag.collect_dags(only_if_updated=False)
-        flash("All DAGs are now up to date")
-        return redirect('/')
-
-    @expose('/gantt')
-    @login_required
-    @wwwutils.action_logging
-    def gantt(self):
-
-        session = settings.Session()
-        dag_id = request.args.get('dag_id')
-        dag = dagbag.get_dag(dag_id)
-        demo_mode = conf.getboolean('webserver', 'demo_mode')
-
-        root = request.args.get('root')
-        if root:
-            dag = dag.sub_dag(
-                task_regex=root,
-                include_upstream=True,
-                include_downstream=False)
-
-        dttm = request.args.get('execution_date')
-        if dttm:
-            dttm = dateutil.parser.parse(dttm)
-        else:
-            dttm = dag.latest_execution_date or datetime.now().date()
-
-        form = DateTimeForm(data={'execution_date': dttm})
-
-        tis = [
-            ti
-            for ti in dag.get_task_instances(session, dttm, dttm)
-            if ti.start_date]
-        tis = sorted(tis, key=lambda ti: ti.start_date)
-        tasks = []
-        data = []
-        for i, ti in enumerate(tis):
-            end_date = ti.end_date or datetime.now()
-            tasks += [ti.task_id]
-            color = State.color(ti.state)
-            data.append({
-                'x': i,
-                'low': int(ti.start_date.strftime('%s')) * 1000,
-                'high': int(end_date.strftime('%s')) * 1000,
-                'color': color,
-            })
-        height = (len(tis) * 25) + 50
-        session.commit()
-        session.close()
-
-        hc = {
-            'chart': {
-                'type': 'columnrange',
-                'inverted': True,
-                'height': height,
-            },
-            'xAxis': {'categories': tasks, 'alternateGridColor': '#FAFAFA'},
-            'yAxis': {'type': 'datetime'},
-            'title': {
-                'text': None
-            },
-            'plotOptions': {
-                'series': {
-                    'cursor': 'pointer',
-                    'minPointLength': 4,
-                },
-            },
-            'legend': {
-                'enabled': False
-            },
-            'series': [{
-                'data': data
-            }]
-        }
-        return self.render(
-            'airflow/gantt.html',
-            dag=dag,
-            execution_date=dttm.isoformat(),
-            form=form,
-            hc=json.dumps(hc, indent=4),
-            height=height,
-            demo_mode=demo_mode,
-            root=root,
-        )
-
-    @expose('/object/task_instances')
-    @login_required
-    @wwwutils.action_logging
-    def task_instances(self):
-        session = settings.Session()
-        dag_id = request.args.get('dag_id')
-        dag = dagbag.get_dag(dag_id)
-
-        dttm = request.args.get('execution_date')
-        if dttm:
-            dttm = dateutil.parser.parse(dttm)
-        else:
-            return ("Error: Invalid execution_date")
-
-        task_instances = {
-            ti.task_id: alchemy_to_dict(ti)
-            for ti in dag.get_task_instances(session, dttm, dttm)}
-
-        return json.dumps(task_instances)
-
-    @expose('/variables/<form>', methods=["GET", "POST"])
-    @login_required
-    @wwwutils.action_logging
-    def variables(self, form):
-        try:
-            if request.method == 'POST':
-                data = request.json
-                if data:
-                    session = settings.Session()
-                    var = models.Variable(key=form, val=json.dumps(data))
-                    session.add(var)
-                    session.commit()
-                return ""
-            else:
-                return self.render(
-                    'airflow/variables/{}.html'.format(form)
-                )
-        except:
-            return ("Error: form airflow/variables/{}.html "
-                    "not found.").format(form), 404
-
-
-class HomeView(AdminIndexView):
-    @expose("/")
-    @login_required
-    def index(self):
-        session = Session()
-        DM = models.DagModel
-        qry = None
-        # filter the dags if filter_by_owner and current user is not superuser
-        do_filter = FILTER_BY_OWNER and (not current_user.is_superuser())
-        if do_filter:
-            qry = (
-                session.query(DM)
-                    .filter(
-                    ~DM.is_subdag, DM.is_active,
-                    DM.owners == current_user.username)
-                    .all()
-            )
-        else:
-            qry = session.query(DM).filter(~DM.is_subdag, DM.is_active).all()
-        orm_dags = {dag.dag_id: dag for dag in qry}
-        import_errors = session.query(models.ImportError).all()
-        for ie in import_errors:
-            flash(
-                "Broken DAG: [{ie.filename}] {ie.stacktrace}".format(ie=ie),
-                "error")
-        session.expunge_all()
-        session.commit()
-        session.close()
-        dags = dagbag.dags.values()
-        if do_filter:
-            dags = {
-                dag.dag_id: dag
-                for dag in dags
-                if (
-                    dag.owner == current_user.username and (not dag.parent_dag)
-                )
-                }
-        else:
-            dags = {dag.dag_id: dag for dag in dags if not dag.parent_dag}
-        all_dag_ids = sorted(set(orm_dags.keys()) | set(dags.keys()))
-        return self.render(
-            'airflow/dags.html',
-            dags=dags,
-            orm_dags=orm_dags,
-            all_dag_ids=all_dag_ids)
-
-
-class QueryView(wwwutils.DataProfilingMixin, BaseView):
-    @expose('/')
-    @wwwutils.gzipped
-    def query(self):
-        session = settings.Session()
-        dbs = session.query(models.Connection).order_by(
-            models.Connection.conn_id).all()
-        session.expunge_all()
-        db_choices = list(
-            ((db.conn_id, db.conn_id) for db in dbs if db.get_hook()))
-        conn_id_str = request.args.get('conn_id')
-        csv = request.args.get('csv') == "true"
-        sql = request.args.get('sql')
-
-        class QueryForm(Form):
-            conn_id = SelectField("Layout", choices=db_choices)
-            sql = TextAreaField("SQL", widget=wwwutils.AceEditorWidget())
-        data = {
-            'conn_id': conn_id_str,
-            'sql': sql,
-        }
-        results = None
-        has_data = False
-        error = False
-        if conn_id_str:
-            db = [db for db in dbs if db.conn_id == conn_id_str][0]
-            hook = db.get_hook()
-            try:
-                df = hook.get_pandas_df(wwwutils.limit_sql(sql, QUERY_LIMIT, conn_type=db.conn_type))
-                # df = hook.get_pandas_df(sql)
-                has_data = len(df) > 0
-                df = df.fillna('')
-                results = df.to_html(
-                    classes=[
-                        'table', 'table-bordered', 'table-striped', 'no-wrap'],
-                    index=False,
-                    na_rep='',
-                ) if has_data else ''
-            except Exception as e:
-                flash(str(e), 'error')
-                error = True
-
-        if has_data and len(df) == QUERY_LIMIT:
-            flash(
-                "Query output truncated at " + str(QUERY_LIMIT) +
-                " rows", 'info')
-
-        if not has_data and error:
-            flash('No data', 'error')
-
-        if csv:
-            return Response(
-                response=df.to_csv(index=False),
-                status=200,
-                mimetype="application/text")
-
-        form = QueryForm(request.form, data=data)
-        session.commit()
-        session.close()
-        return self.render(
-            'airflow/query.html', form=form,
-            title="Ad Hoc Query",
-            results=results or '',
-            has_data=has_data)
-
-
-class AirflowModelView(ModelView):
-    list_template = 'airflow/model_list.html'
-    edit_template = 'airflow/model_edit.html'
-    create_template = 'airflow/model_create.html'
-    column_display_actions = True
-    page_size = 500
-
-
-class ModelViewOnly(wwwutils.LoginMixin, AirflowModelView):
-    """
-    Modifying the base ModelView class for non edit, browse only operations
-    """
-    named_filter_urls = True
-    can_create = False
-    can_edit = False
-    can_delete = False
-    column_display_pk = True
-
-
-class PoolModelView(wwwutils.SuperUserMixin, AirflowModelView):
-    column_list = ('pool', 'slots', 'used_slots', 'queued_slots')
-    column_formatters = dict(
-        pool=pool_link, used_slots=fused_slots, queued_slots=fqueued_slots)
-    named_filter_urls = True
-
-
-class SlaMissModelView(wwwutils.SuperUserMixin, ModelViewOnly):
-    verbose_name_plural = "SLA misses"
-    verbose_name = "SLA miss"
-    column_list = (
-        'dag_id', 'task_id', 'execution_date', 'email_sent', 'timestamp')
-    column_formatters = dict(
-        task_id=task_instance_link,
-        execution_date=datetime_f,
-        timestamp=datetime_f,
-        dag_id=dag_link)
-    named_filter_urls = True
-    column_searchable_list = ('dag_id', 'task_id',)
-    column_filters = (
-        'dag_id', 'task_id', 'email_sent', 'timestamp', 'execution_date')
-    form_widget_args = {
-        'email_sent': {'disabled': True},
-        'timestamp': {'disabled': True},
-    }
-
-class ChartModelView(wwwutils.DataProfilingMixin, AirflowModelView):
-    verbose_name = "chart"
-    verbose_name_plural = "charts"
-    form_columns = (
-        'label',
-        'owner',
-        'conn_id',
-        'chart_type',
-        'show_datatable',
-        'x_is_date',
-        'y_log_scale',
-        'show_sql',
-        'height',
-        'sql_layout',
-        'sql',
-        'default_params',)
-    column_list = (
-        'label', 'conn_id', 'chart_type', 'owner', 'last_modified',)
-    column_formatters = dict(label=label_link, last_modified=datetime_f)
-    column_default_sort = ('last_modified', True)
-    create_template = 'airflow/chart/create.html'
-    edit_template = 'airflow/chart/edit.html'
-    column_filters = ('label', 'owner.username', 'conn_id')
-    column_searchable_list = ('owner.username', 'label', 'sql')
-    column_descriptions = {
-        'label': "Can include {{ templated_fields }} and {{ macros }}",
-        'chart_type': "The type of chart to be displayed",
-        'sql': "Can include {{ templated_fields }} and {{ macros }}.",
-        'height': "Height of the chart, in pixels.",
-        'conn_id': "Source database to run the query against",
-        'x_is_date': (
-            "Whether the X axis should be casted as a date field. Expect most "
-            "intelligible date formats to get casted properly."
-        ),
-        'owner': (
-            "The chart's owner, mostly used for reference and filtering in "
-            "the list view."
-        ),
-        'show_datatable':
-            "Whether to display an interactive data table under the chart.",
-        'default_params': (
-            'A dictionary of {"key": "values",} that define what the '
-            'templated fields (parameters) values should be by default. '
-            'To be valid, it needs to "eval" as a Python dict. '
-            'The key values will show up in the url\'s querystring '
-            'and can be altered there.'
-        ),
-        'show_sql': "Whether to display the SQL statement as a collapsible "
-                    "section in the chart page.",
-        'y_log_scale': "Whether to use a log scale for the Y axis.",
-        'sql_layout': (
-            "Defines the layout of the SQL that the application should "
-            "expect. Depending on the tables you are sourcing from, it may "
-            "make more sense to pivot / unpivot the metrics."
-        ),
-    }
-    column_labels = {
-        'sql': "SQL",
-        'height': "Chart Height",
-        'sql_layout': "SQL Layout",
-        'show_sql': "Display the SQL Statement",
-        'default_params': "Default Parameters",
-    }
-    form_choices = {
-        'chart_type': [
-            ('line', 'Line Chart'),
-            ('spline', 'Spline Chart'),
-            ('bar', 'Bar Chart'),
-            ('para', 'Parallel Coordinates'),
-            ('column', 'Column Chart'),
-            ('area', 'Overlapping Area Chart'),
-            ('stacked_area', 'Stacked Area Chart'),
-            ('percent_area', 'Percent Area Chart'),
-            ('heatmap', 'Heatmap'),
-            ('datatable', 'No chart, data table only'),
-        ],
-        'sql_layout': [
-            ('series', 'SELECT series, x, y FROM ...'),
-            ('columns', 'SELECT x, y (series 1), y (series 2), ... FROM ...'),
-        ],
-        'conn_id': [
-            (c.conn_id, c.conn_id)
-            for c in (
-                Session().query(models.Connection.conn_id)
-                    .group_by(models.Connection.conn_id)
-            )
-            ]
-    }
-
-    def on_model_change(self, form, model, is_created=True):
-        if model.iteration_no is None:
-            model.iteration_no = 0
-        else:
-            model.iteration_no += 1
-        if not model.user_id and current_user and hasattr(current_user, 'id'):
-            model.user_id = current_user.id
-        model.last_modified = datetime.now()
-
-
-class KnowEventView(wwwutils.DataProfilingMixin, AirflowModelView):
-    verbose_name = "known event"
-    verbose_name_plural = "known events"
-    form_columns = (
-        'label',
-        'event_type',
-        'start_date',
-        'end_date',
-        'reported_by',
-        'description')
-    column_list = (
-        'label', 'event_type', 'start_date', 'end_date', 'reported_by')
-    column_default_sort = ("start_date", True)
-
-
-class KnowEventTypeView(wwwutils.DataProfilingMixin, AirflowModelView):
-    pass
-
-'''
-# For debugging / troubleshooting
-mv = KnowEventTypeView(
-    models.KnownEventType,
-    Session, name="Known Event Types", category="Manage")
-admin.add_view(mv)
-class DagPickleView(SuperUserMixin, ModelView):
-    pass
-mv = DagPickleView(
-    models.DagPickle,
-    Session, name="Pickles", category="Manage")
-admin.add_view(mv)
-'''
-
-
-class VariableView(wwwutils.LoginMixin, AirflowModelView):
-    verbose_name = "Variable"
-    verbose_name_plural = "Variables"
-    form_columns = (
-        'key',
-        'val',
-    )
-    column_list = ('key', 'is_encrypted',)
-    column_filters = ('key', 'val')
-    column_searchable_list = ('key', 'val')
-    form_widget_args = {
-        'is_encrypted': {'disabled': True},
-        'val': {
-            'rows': 20,
-        }
-    }
-
-
-class JobModelView(ModelViewOnly):
-    verbose_name_plural = "jobs"
-    verbose_name = "job"
-    column_default_sort = ('start_date', True)
-    column_filters = (
-        'job_type', 'dag_id', 'state',
-        'unixname', 'hostname', 'start_date', 'end_date', 'latest_heartbeat')
-    column_formatters = dict(
-        start_date=datetime_f,
-        end_date=datetime_f,
-        hostname=nobr_f,
-        state=state_f,
-        latest_heartbeat=datetime_f)
-
-
-class DagRunModelView(ModelViewOnly):
-    verbose_name_plural = "DAG Runs"
-    can_delete = True
-    can_edit = True
-    can_create = True
-    column_editable_list = ('state',)
-    verbose_name = "dag run"
-    column_default_sort = ('execution_date', True)
-    form_choices = {
-        'state': [
-            ('success', 'success'),
-            ('running', 'running'),
-            ('failed', 'failed'),
-        ],
-    }
-    column_list = (
-        'state', 'dag_id', 'execution_date', 'run_id', 'external_trigger')
-    column_filters = column_list
-    column_searchable_list = ('dag_id', 'state', 'run_id')
-    column_formatters = dict(
-        execution_date=datetime_f,
-        state=state_f,
-        start_date=datetime_f,
-        dag_id=dag_link)
-
-    @action('set_running', "Set state to 'running'", None)
-    def action_set_running(self, ids):
-        self.set_dagrun_state(ids, State.RUNNING)
-
-    @action('set_failed', "Set state to 'failed'", None)
-    def action_set_failed(self, ids):
-        self.set_dagrun_state(ids, State.FAILED)
-
-    @action('set_success', "Set state to 'success'", None)
-    def action_set_success(self, ids):
-        self.set_dagrun_state(ids, State.SUCCESS)
-
-    @provide_session
-    def set_dagrun_state(self, ids, target_state, session=None):
-        try:
-            DR = models.DagRun
-            count = 0
-            for dr in session.query(DR).filter(DR.id.in_(ids)).all():
-                count += 1
-                dr.state = target_state
-                if target_state == State.RUNNING:
-                    dr.start_date = datetime.now()
-                else:
-                    dr.end_date = datetime.now()
-            session.commit()
-            flash(
-                "{count} dag runs were set to '{target_state}'".format(**locals()))
-        except Exception as ex:
-            if not self.handle_view_exception(ex):
-                raise Exception("Ooops")
-            flash('Failed to set state', 'error')
-
-
-class LogModelView(ModelViewOnly):
-    verbose_name_plural = "logs"
-    verbose_name = "log"
-    column_default_sort = ('dttm', True)
-    column_filters = ('dag_id', 'task_id', 'execution_date')
-    column_formatters = dict(
-        dttm=datetime_f, execution_date=datetime_f, dag_id=dag_link)
-
-
-class TaskInstanceModelView(ModelViewOnly):
-    verbose_name_plural = "task instances"
-    verbose_name = "task instance"
-    column_filters = (
-        'state', 'dag_id', 'task_id', 'execution_date', 'hostname',
-        'queue', 'pool', 'operator', 'start_date', 'end_date')
-    named_filter_urls = True
-    column_formatters = dict(
-        log=log_link, task_id=task_instance_link,
-        hostname=nobr_f,
-        state=state_f,
-        execution_date=datetime_f,
-        start_date=datetime_f,
-        end_date=datetime_f,
-        queued_dttm=datetime_f,
-        dag_id=dag_link, duration=duration_f)
-    column_searchable_list = ('dag_id', 'task_id', 'state')
-    column_default_sort = ('start_date', True)
-    form_choices = {
-        'state': [
-            ('success', 'success'),
-            ('running', 'running'),
-            ('failed', 'failed'),
-        ],
-    }
-    column_list = (
-        'state', 'dag_id', 'task_id', 'execution_date', 'operator',
-        'start_date', 'end_date', 'duration', 'job_id', 'hostname',
-        'unixname', 'priority_weight', 'queue', 'queued_dttm', 'try_number',
-        'pool', 'log')
-    can_delete = True
-    page_size = 500
-
-    @action('set_running', "Set state to 'running'", None)
-    def action_set_running(self, ids):
-        self.set_task_instance_state(ids, State.RUNNING)
-
-    @action('set_failed', "Set state to 'failed'", None)
-    def action_set_failed(self, ids):
-        self.set_task_instance_state(ids, State.FAILED)
-
-    @action('set_success', "Set state to 'success'", None)
-    def action_set_success(self, ids):
-        self.set_task_instance_state(ids, State.SUCCESS)
-
-    @action('set_retry', "Set state to 'up_for_retry'", None)
-    def action_set_retry(self, ids):
-        self.set_task_instance_state(ids, State.UP_FOR_RETRY)
-
-    @provide_session
-    def set_task_instance_state(self, ids, target_state, session=None):
-        try:
-            TI = models.TaskInstance
-            for count, id in enumerate(ids):
-                task_id, dag_id, execution_date = id.split(',')
-                execution_date = datetime.strptime(execution_date, '%Y-%m-%d %H:%M:%S')
-                ti = session.query(TI).filter(TI.task_id == task_id,
-                                              TI.dag_id == dag_id,
-                                              TI.execution_date == execution_date).one()
-                ti.state = target_state
-            count += 1
-            session.commit()
-            flash(
-                "{count} task instances were set to '{target_state}'".format(**locals()))
-        except Exception as ex:
-            if not self.handle_view_exception(ex):
-                raise Exception("Ooops")
-            flash('Failed to set state', 'error')
-
-
-class ConnectionModelView(wwwutils.SuperUserMixin, AirflowModelView):
-    create_template = 'airflow/conn_create.html'
-    edit_template = 'airflow/conn_edit.html'
-    list_template = 'airflow/conn_list.html'
-    form_columns = (
-        'conn_id',
-        'conn_type',
-        'host',
-        'schema',
-        'login',
-        'password',
-        'port',
-        'extra',
-        'extra__jdbc__drv_path',
-        'extra__jdbc__drv_clsname',
-        'extra__google_cloud_platform__project',
-        'extra__google_cloud_platform__key_path',
-        'extra__google_cloud_platform__service_account',
-        'extra__google_cloud_platform__scope',
-    )
-    verbose_name = "Connection"
-    verbose_name_plural = "Connections"
-    column_default_sort = ('conn_id', False)
-    column_list = ('conn_id', 'conn_type', 'host', 'port', 'is_encrypted', 'is_extra_encrypted',)
-    form_overrides = dict(_password=PasswordField)
-    form_widget_args = {
-        'is_extra_encrypted': {'disabled': True},
-        'is_encrypted': {'disabled': True},
-    }
-    # Used to customized the form, the forms elements get rendered
-    # and results are stored in the extra field as json. All of these
-    # need to be prefixed with extra__ and then the conn_type ___ as in
-    # extra__{conn_type}__name. You can also hide form elements and rename
-    # others from the connection_form.js file
-    form_extra_fields = {
-        'extra__jdbc__drv_path' : StringField('Driver Path'),
-        'extra__jdbc__drv_clsname': StringField('Driver Class'),
-        'extra__google_cloud_platform__project': StringField('Project'),
-        'extra__google_cloud_platform__key_path': StringField('Keyfile Path'),
-        'extra__google_cloud_platform__service_account': StringField('Service Account'),
-        'extra__google_cloud_platform__scope': StringField('Scopes (comma seperated)'),
-
-    }
-    form_choices = {
-        'conn_type': [
-            ('bigquery', 'BigQuery',),
-            ('datastore', 'Google Datastore'),
-            ('ftp', 'FTP',),
-            ('google_cloud_storage', 'Google Cloud Storage'),
-            ('google_cloud_platform', 'Google Cloud Platform'),
-            ('hdfs', 'HDFS',),
-            ('http', 'HTTP',),
-            ('hive_cli', 'Hive Client Wrapper',),
-            ('hive_metastore', 'Hive Metastore Thrift',),
-            ('hiveserver2', 'Hive Server 2 Thrift',),
-            ('jdbc', 'Jdbc Connection',),
-            ('mysql', 'MySQL',),
-            ('postgres', 'Postgres',),
-            ('oracle', 'Oracle',),
-            ('vertica', 'Vertica',),
-            ('presto', 'Presto',),
-            ('s3', 'S3',),
-            ('samba', 'Samba',),
-            ('sqlite', 'Sqlite',),
-            ('ssh', 'SSH',),
-            ('cloudant', 'IBM Cloudant',),
-            ('mssql', 'Microsoft SQL Server'),
-            ('mesos_framework-id', 'Mesos Framework ID'),
-        ]
-    }
-
-    def on_model_change(self, form, model, is_created):
-        formdata = form.data
-        if formdata['conn_type'] in ['jdbc', 'google_cloud_platform']:
-            extra = {
-                key:formdata[key]
-                for key in self.form_extra_fields.keys() if key in formdata}
-            model.extra = json.dumps(extra)
-
-    @classmethod
-    def alert_fernet_key(cls):
-        return conf.get('core', 'fernet_key') is None
-
-    @classmethod
-    def is_secure(self):
-        """
-        Used to display a message in the Connection list view making it clear
-        that the passwords and `extra` field can't be encrypted.
-        """
-        is_secure = False
-        try:
-            import cryptography
-            conf.get('core', 'fernet_key')
-            is_secure = True
-        except:
-            pass
-        return is_secure
-
-    def on_form_prefill(self, form, id):
-        try:
-            d = json.loads(form.data.get('extra', '{}'))
-        except Exception as e:
-            d = {}
-
-        for field in list(self.form_extra_fields.keys()):
-            value = d.get(field, '')
-            if value:
-                field = getattr(form, field)
-                field.data = value
-
-
-class UserModelView(wwwutils.SuperUserMixin, AirflowModelView):
-    verbose_name = "User"
-    verbose_name_plural = "Users"
-    column_default_sort = 'username'
-
-
-class ConfigurationView(wwwutils.SuperUserMixin, BaseView):
-    @expose('/')
-    def conf(self):
-        raw = request.args.get('raw') == "true"
-        title = "Airflow Configuration"
-        subtitle = conf.AIRFLOW_CONFIG
-        if conf.getboolean("webserver", "expose_config"):
-            with open(conf.AIRFLOW_CONFIG, 'r') as f:
-                config = f.read()
-        else:
-            config = (
-                "# You Airflow administrator chose not to expose the "
-                "configuration, most likely for security reasons.")
-        if raw:
-            return Response(
-                response=config,
-                status=200,
-                mimetype="application/text")
-        else:
-            code_html = Markup(highlight(
-                config,
-                lexers.IniLexer(),  # Lexer call
-                HtmlFormatter(noclasses=True))
-            )
-            return self.render(
-                'airflow/code.html',
-                pre_subtitle=settings.HEADER + "  v" + airflow.__version__,
-                code_html=code_html, title=title, subtitle=subtitle)
-
-
-class DagModelView(wwwutils.SuperUserMixin, ModelView):
-    column_list = ('dag_id', 'owners')
-    column_editable_list = ('is_paused',)
-    form_excluded_columns = ('is_subdag', 'is_active')
-    column_searchable_list = ('dag_id',)
-    column_filters = (
-        'dag_id', 'owners', 'is_paused', 'is_active', 'is_subdag',
-        'last_scheduler_run', 'last_expired')
-    form_widget_args = {
-        'last_scheduler_run': {'disabled': True},
-        'fileloc': {'disabled': True},
-        'is_paused': {'disabled': True},
-        'last_pickled': {'disabled': True},
-        'pickle_id': {'disabled': True},
-        'last_loaded': {'disabled': True},
-        'last_expired': {'disabled': True},
-        'pickle_size': {'disabled': True},
-        'scheduler_lock': {'disabled': True},
-        'owners': {'disabled': True},
-    }
-    column_formatters = dict(
-        dag_id=dag_link,
-    )
-    can_delete = False
-    can_create = False
-    page_size = 50
-    list_template = 'airflow/list_dags.html'
-    named_filter_urls = True
-
-    def get_query(self):
-        """
-        Default filters for model
-        """
-        return (
-            super(DagModelView, self)
-                .get_query()
-                .filter(or_(models.DagModel.is_active, models.DagModel.is_paused))
-                .filter(~models.DagModel.is_subdag)
-        )
-
-    def get_count_query(self):
-        """
-        Default filters for model
-        """
-        return (
-            super(DagModelView, self)
-                .get_count_query()
-                .filter(models.DagModel.is_active)
-                .filter(~models.DagModel.is_subdag)
-        )