You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2016/06/20 13:37:38 UTC
[8/9] incubator-airflow git commit: [AIRFLOW-6] Remove dependency on
Highcharts
[AIRFLOW-6] Remove dependency on Highcharts
Highcharts' license is not compatible with the Apache 2.0
license. This patch removes Highcharts in favor of d3,
however some charts are not supported anymore.
* This brings Maxime Beauchemin's work to master
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0a460081
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0a460081
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0a460081
Branch: refs/heads/master
Commit: 0a460081bc7cba2d05434148f092b87d35aa8cd3
Parents: d243c00
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Mon Jun 20 14:19:34 2016 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Mon Jun 20 14:53:30 2016 +0200
----------------------------------------------------------------------
` | 2347 ++++
airflow/bin/airflow | 13 +
airflow/executors/base_executor.py | 14 +
airflow/executors/celery_executor.py | 14 +
airflow/hooks/__init__.py | 2 +-
airflow/hooks/dbapi_hook.py | 1 -
airflow/hooks/jdbc_hook.py | 3 +-
airflow/hooks/oracle_hook.py | 1 +
airflow/www/app.py | 4 +-
airflow/www/blueprints.py | 6 -
airflow/www/static/d3.tip.v0.6.3.js | 280 +
airflow/www/static/d3.v3.min.js | 10 +-
airflow/www/static/gantt-chart-d3v2.js | 247 +
airflow/www/static/gantt.css | 38 +
airflow/www/static/highcharts-more.js | 53 -
airflow/www/static/highcharts.js | 308 -
airflow/www/static/nv.d3.css | 769 ++
airflow/www/static/nv.d3.js | 14241 ++++++++++++++++++++
airflow/www/static/nvd3.tar.gz | Bin 0 -> 328377 bytes
airflow/www/templates/airflow/chart.html | 37 +-
airflow/www/templates/airflow/dag.html | 4 +-
airflow/www/templates/airflow/gantt.html | 86 +-
airflow/www/templates/airflow/highchart.html | 183 -
airflow/www/templates/airflow/nvd3.html | 175 +
airflow/www/utils.py | 11 +-
airflow/www/views.py | 352 +-
setup.py | 23 +-
tests/core.py | 19 -
28 files changed, 18315 insertions(+), 926 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0a460081/`
----------------------------------------------------------------------
diff --git a/` b/`
new file mode 100644
index 0000000..6331805
--- /dev/null
+++ b/`
@@ -0,0 +1,2347 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import sys
+
+import os
+import socket
+import importlib
+
+from functools import wraps
+from datetime import datetime, timedelta
+import dateutil.parser
+import copy
+from itertools import chain, product
+
+from past.utils import old_div
+from past.builtins import basestring
+
+import inspect
+import traceback
+
+import sqlalchemy as sqla
+from sqlalchemy import or_, desc, and_
+
+
+from flask import redirect, url_for, request, Markup, Response, current_app, render_template
+from flask_admin import BaseView, expose, AdminIndexView
+from flask_admin.contrib.sqla import ModelView
+from flask_admin.actions import action
+from flask_login import flash
+from flask._compat import PY2
+
+import jinja2
+import markdown
+import json
+
+from wtforms import (
+ Form, SelectField, TextAreaField, PasswordField, StringField)
+
+from pygments import highlight, lexers
+from pygments.formatters import HtmlFormatter
+
+import airflow
+from airflow import configuration as conf
+from airflow import models
+from airflow import settings
+from airflow.exceptions import AirflowException
+from airflow.settings import Session
+from airflow.models import XCom
+
+from airflow.utils.json import json_ser
+from airflow.utils.state import State
+from airflow.utils.db import provide_session
+from airflow.utils.helpers import alchemy_to_dict
+from airflow.utils import logging as log_utils
+from airflow.www import utils as wwwutils
+from airflow.www.forms import DateTimeForm, DateTimeWithNumRunsForm
+
+QUERY_LIMIT = 100000
+CHART_LIMIT = 200000
+
+dagbag = models.DagBag(os.path.expanduser(conf.get('core', 'DAGS_FOLDER')))
+
+login_required = airflow.login.login_required
+current_user = airflow.login.current_user
+logout_user = airflow.login.logout_user
+
+FILTER_BY_OWNER = False
+if conf.getboolean('webserver', 'FILTER_BY_OWNER'):
+ # filter_by_owner if authentication is enabled and filter_by_owner is true
+ FILTER_BY_OWNER = not current_app.config['LOGIN_DISABLED']
+
+
+def dag_link(v, c, m, p):
+ url = url_for(
+ 'airflow.graph',
+ dag_id=m.dag_id)
+ return Markup(
+ '<a href="{url}">{m.dag_id}</a>'.format(**locals()))
+
+
+def log_link(v, c, m, p):
+ url = url_for(
+ 'airflow.log',
+ dag_id=m.dag_id,
+ task_id=m.task_id,
+ execution_date=m.execution_date.isoformat())
+ return Markup(
+ '<a href="{url}">'
+ ' <span class="glyphicon glyphicon-book" aria-hidden="true">'
+ '</span></a>').format(**locals())
+
+
+def task_instance_link(v, c, m, p):
+ url = url_for(
+ 'airflow.task',
+ dag_id=m.dag_id,
+ task_id=m.task_id,
+ execution_date=m.execution_date.isoformat())
+ url_root = url_for(
+ 'airflow.graph',
+ dag_id=m.dag_id,
+ root=m.task_id,
+ execution_date=m.execution_date.isoformat())
+ return Markup(
+ """
+ <span style="white-space: nowrap;">
+ <a href="{url}">{m.task_id}</a>
+ <a href="{url_root}" title="Filter on this task and upstream">
+ <span class="glyphicon glyphicon-filter" style="margin-left: 0px;"
+ aria-hidden="true"></span>
+ </a>
+ </span>
+ """.format(**locals()))
+
+
+def state_token(state):
+ color = State.color(state)
+ return Markup(
+ '<span class="label" style="background-color:{color};">'
+ '{state}</span>'.format(**locals()))
+
+
+def state_f(v, c, m, p):
+ return state_token(m.state)
+
+
+def duration_f(v, c, m, p):
+ if m.end_date and m.duration:
+ return timedelta(seconds=m.duration)
+
+
+def datetime_f(v, c, m, p):
+ attr = getattr(m, p)
+ dttm = attr.isoformat() if attr else ''
+ if datetime.now().isoformat()[:4] == dttm[:4]:
+ dttm = dttm[5:]
+ return Markup("<nobr>{}</nobr>".format(dttm))
+
+
+def nobr_f(v, c, m, p):
+ return Markup("<nobr>{}</nobr>".format(getattr(m, p)))
+
+
+def label_link(v, c, m, p):
+ try:
+ default_params = eval(m.default_params)
+ except:
+ default_params = {}
+ url = url_for(
+ 'airflow.chart', chart_id=m.id, iteration_no=m.iteration_no,
+ **default_params)
+ return Markup("<a href='{url}'>{m.label}</a>".format(**locals()))
+
+
+def pool_link(v, c, m, p):
+ url = '/admin/taskinstance/?flt1_pool_equals=' + m.pool
+ return Markup("<a href='{url}'>{m.pool}</a>".format(**locals()))
+
+
+def pygment_html_render(s, lexer=lexers.TextLexer):
+ return highlight(
+ s,
+ lexer(),
+ HtmlFormatter(linenos=True),
+ )
+
+
+def render(obj, lexer):
+ out = ""
+ if isinstance(obj, basestring):
+ out += pygment_html_render(obj, lexer)
+ elif isinstance(obj, (tuple, list)):
+ for i, s in enumerate(obj):
+ out += "<div>List item #{}</div>".format(i)
+ out += "<div>" + pygment_html_render(s, lexer) + "</div>"
+ elif isinstance(obj, dict):
+ for k, v in obj.items():
+ out += '<div>Dict item "{}"</div>'.format(k)
+ out += "<div>" + pygment_html_render(v, lexer) + "</div>"
+ return out
+
+
+def wrapped_markdown(s):
+ return '<div class="rich_doc">' + markdown.markdown(s) + "</div>"
+
+
+attr_renderer = {
+ 'bash_command': lambda x: render(x, lexers.BashLexer),
+ 'hql': lambda x: render(x, lexers.SqlLexer),
+ 'sql': lambda x: render(x, lexers.SqlLexer),
+ 'doc': lambda x: render(x, lexers.TextLexer),
+ 'doc_json': lambda x: render(x, lexers.JsonLexer),
+ 'doc_rst': lambda x: render(x, lexers.RstLexer),
+ 'doc_yaml': lambda x: render(x, lexers.YamlLexer),
+ 'doc_md': wrapped_markdown,
+ 'python_callable': lambda x: render(
+ inspect.getsource(x), lexers.PythonLexer),
+}
+
+
+def data_profiling_required(f):
+ '''
+ Decorator for views requiring data profiling access
+ '''
+ @wraps(f)
+ def decorated_function(*args, **kwargs):
+ if (
+ current_app.config['LOGIN_DISABLED'] or
+ (not current_user.is_anonymous() and current_user.data_profiling())
+ ):
+ return f(*args, **kwargs)
+ else:
+ flash("This page requires data profiling privileges", "error")
+ return redirect(url_for('admin.index'))
+ return decorated_function
+
+
+def fused_slots(v, c, m, p):
+ url = (
+ '/admin/taskinstance/' +
+ '?flt1_pool_equals=' + m.pool +
+ '&flt2_state_equals=running')
+ return Markup("<a href='{0}'>{1}</a>".format(url, m.used_slots()))
+
+
+def fqueued_slots(v, c, m, p):
+ url = (
+ '/admin/taskinstance/' +
+ '?flt1_pool_equals=' + m.pool +
+ '&flt2_state_equals=queued&sort=10&desc=1')
+ return Markup("<a href='{0}'>{1}</a>".format(url, m.queued_slots()))
+
+
+class Airflow(BaseView):
+
+ def is_visible(self):
+ return False
+
+ @expose('/')
+ @login_required
+ def index(self):
+ return self.render('airflow/dags.html')
+
+ @expose('/chart_data')
+ @data_profiling_required
+ @wwwutils.gzipped
+ # @cache.cached(timeout=3600, key_prefix=wwwutils.make_cache_key)
+ def chart_data(self):
+ session = settings.Session()
+ chart_id = request.args.get('chart_id')
+ csv = request.args.get('csv') == "true"
+ chart = session.query(models.Chart).filter_by(id=chart_id).first()
+ db = session.query(
+ models.Connection).filter_by(conn_id=chart.conn_id).first()
+ session.expunge_all()
+ session.commit()
+ session.close()
+
+ payload = {}
+ payload['state'] = 'ERROR'
+ payload['error'] = ''
+
+ # Processing templated fields
+ try:
+ args = eval(chart.default_params)
+ if type(args) is not type(dict()):
+ raise AirflowException('Not a dict')
+ except:
+ args = {}
+ payload['error'] += (
+ "Default params is not valid, string has to evaluate as "
+ "a Python dictionary. ")
+
+ request_dict = {k: request.args.get(k) for k in request.args}
+ from airflow import macros
+ args.update(request_dict)
+ args['macros'] = macros
+ sql = jinja2.Template(chart.sql).render(**args)
+ label = jinja2.Template(chart.label).render(**args)
+ payload['sql_html'] = Markup(highlight(
+ sql,
+ lexers.SqlLexer(), # Lexer call
+ HtmlFormatter(noclasses=True))
+ )
+ payload['label'] = label
+
+ import pandas as pd
+ pd.set_option('display.max_colwidth', 100)
+ hook = db.get_hook()
+ try:
+ df = hook.get_pandas_df(wwwutils.limit_sql(sql, CHART_LIMIT, conn_type=db.conn_type))
+ df = df.fillna(0)
+ except Exception as e:
+ payload['error'] += "SQL execution failed. Details: " + str(e)
+
+ if csv:
+ return Response(
+ response=df.to_csv(index=False),
+ status=200,
+ mimetype="application/text")
+
+ if not payload['error'] and len(df) == CHART_LIMIT:
+ payload['warning'] = (
+ "Data has been truncated to {0}"
+ " rows. Expect incomplete results.").format(CHART_LIMIT)
+
+ if not payload['error'] and len(df) == 0:
+ payload['error'] += "Empty result set. "
+ elif (
+ not payload['error'] and
+ chart.sql_layout == 'series' and
+ chart.chart_type != "datatable" and
+ len(df.columns) < 3):
+ payload['error'] += "SQL needs to return at least 3 columns. "
+ elif (
+ not payload['error'] and
+ chart.sql_layout == 'columns'and
+ len(df.columns) < 2):
+ payload['error'] += "SQL needs to return at least 2 columns. "
+ elif not payload['error']:
+ import numpy as np
+ chart_type = chart.chart_type
+
+ data = None
+ if chart_type == "datatable":
+ chart.show_datatable = True
+ if chart.show_datatable:
+ data = df.to_dict(orient="split")
+ data['columns'] = [{'title': c} for c in data['columns']]
+
+ # Trying to convert time to something Highcharts likes
+ x_col = 1 if chart.sql_layout == 'series' else 0
+ if chart.x_is_date:
+ try:
+ # From string to datetime
+ df[df.columns[x_col]] = pd.to_datetime(
+ df[df.columns[x_col]])
+ except Exception as e:
+ raise AirflowException(str(e))
+ df[df.columns[x_col]] = df[df.columns[x_col]].apply(
+ lambda x: int(x.strftime("%s")) * 1000)
+
+ series = []
+ colorAxis = None
+ if chart_type == 'datatable':
+ payload['data'] = data
+ payload['state'] = 'SUCCESS'
+ return wwwutils.json_response(payload)
+
+ elif chart_type == 'para':
+ df.rename(columns={
+ df.columns[0]: 'name',
+ df.columns[1]: 'group',
+ }, inplace=True)
+ return Response(
+ response=df.to_csv(index=False),
+ status=200,
+ mimetype="application/text")
+
+ elif chart_type == 'heatmap':
+ color_perc_lbound = float(
+ request.args.get('color_perc_lbound', 0))
+ color_perc_rbound = float(
+ request.args.get('color_perc_rbound', 1))
+ color_scheme = request.args.get('color_scheme', 'blue_red')
+
+ if color_scheme == 'blue_red':
+ stops = [
+ [color_perc_lbound, '#00D1C1'],
+ [
+ color_perc_lbound +
+ ((color_perc_rbound - color_perc_lbound)/2),
+ '#FFFFCC'
+ ],
+ [color_perc_rbound, '#FF5A5F']
+ ]
+ elif color_scheme == 'blue_scale':
+ stops = [
+ [color_perc_lbound, '#FFFFFF'],
+ [color_perc_rbound, '#2222FF']
+ ]
+ elif color_scheme == 'fire':
+ diff = float(color_perc_rbound - color_perc_lbound)
+ stops = [
+ [color_perc_lbound, '#FFFFFF'],
+ [color_perc_lbound + 0.33*diff, '#FFFF00'],
+ [color_perc_lbound + 0.66*diff, '#FF0000'],
+ [color_perc_rbound, '#000000']
+ ]
+ else:
+ stops = [
+ [color_perc_lbound, '#FFFFFF'],
+ [
+ color_perc_lbound +
+ ((color_perc_rbound - color_perc_lbound)/2),
+ '#888888'
+ ],
+ [color_perc_rbound, '#000000'],
+ ]
+
+ xaxis_label = df.columns[1]
+ yaxis_label = df.columns[2]
+ data = []
+ for row in df.itertuples():
+ data.append({
+ 'x': row[2],
+ 'y': row[3],
+ 'value': row[4],
+ })
+ x_format = '{point.x:%Y-%m-%d}' \
+ if chart.x_is_date else '{point.x}'
+ series.append({
+ 'data': data,
+ 'borderWidth': 0,
+ 'colsize': 24 * 36e5,
+ 'turboThreshold': sys.float_info.max,
+ 'tooltip': {
+ 'headerFormat': '',
+ 'pointFormat': (
+ df.columns[1] + ': ' + x_format + '<br/>' +
+ df.columns[2] + ': {point.y}<br/>' +
+ df.columns[3] + ': <b>{point.value}</b>'
+ ),
+ },
+ })
+ colorAxis = {
+ 'stops': stops,
+ 'minColor': '#FFFFFF',
+ 'maxColor': '#000000',
+ 'min': 50,
+ 'max': 2200,
+ }
+ else:
+ if chart.sql_layout == 'series':
+ # User provides columns (series, x, y)
+ xaxis_label = df.columns[1]
+ yaxis_label = df.columns[2]
+ df[df.columns[2]] = df[df.columns[2]].astype(np.float)
+ df = df.pivot_table(
+ index=df.columns[1],
+ columns=df.columns[0],
+ values=df.columns[2], aggfunc=np.sum)
+ else:
+ # User provides columns (x, y, metric1, metric2, ...)
+ xaxis_label = df.columns[0]
+ yaxis_label = 'y'
+ df.index = df[df.columns[0]]
+ df = df.sort(df.columns[0])
+ del df[df.columns[0]]
+ for col in df.columns:
+ df[col] = df[col].astype(np.float)
+
+ for col in df.columns:
+ series.append({
+ 'name': col,
+ 'data': [
+ (k, df[col][k])
+ for k in df[col].keys()
+ if not np.isnan(df[col][k])]
+ })
+ series = [serie for serie in sorted(
+ series, key=lambda s: s['data'][0][1], reverse=True)]
+
+ if chart_type == "stacked_area":
+ stacking = "normal"
+ chart_type = 'area'
+ elif chart_type == "percent_area":
+ stacking = "percent"
+ chart_type = 'area'
+ else:
+ stacking = None
+ hc = {
+ 'chart': {
+ 'type': chart_type
+ },
+ 'plotOptions': {
+ 'series': {
+ 'marker': {
+ 'enabled': False
+ }
+ },
+ 'area': {'stacking': stacking},
+ },
+ 'title': {'text': ''},
+ 'xAxis': {
+ 'title': {'text': xaxis_label},
+ 'type': 'datetime' if chart.x_is_date else None,
+ },
+ 'yAxis': {
+ 'title': {'text': yaxis_label},
+ },
+ 'colorAxis': colorAxis,
+ 'tooltip': {
+ 'useHTML': True,
+ 'backgroundColor': None,
+ 'borderWidth': 0,
+ },
+ 'series': series,
+ }
+
+ if chart.y_log_scale:
+ hc['yAxis']['type'] = 'logarithmic'
+ hc['yAxis']['minorTickInterval'] = 0.1
+ if 'min' in hc['yAxis']:
+ del hc['yAxis']['min']
+
+ payload['state'] = 'SUCCESS'
+ payload['hc'] = hc
+ payload['data'] = data
+ payload['request_dict'] = request_dict
+ return wwwutils.json_response(payload)
+
+ @expose('/chart')
+ @data_profiling_required
+ def chart(self):
+ session = settings.Session()
+ chart_id = request.args.get('chart_id')
+ embed = request.args.get('embed')
+ chart = session.query(models.Chart).filter_by(id=chart_id).first()
+ session.expunge_all()
+ session.commit()
+ session.close()
+ if chart.chart_type == 'para':
+ return self.render('airflow/para/para.html', chart=chart)
+
+ sql = ""
+ if chart.show_sql:
+ sql = Markup(highlight(
+ chart.sql,
+ lexers.SqlLexer(), # Lexer call
+ HtmlFormatter(noclasses=True))
+ )
+ return self.render(
+ 'airflow/highchart.html',
+ chart=chart,
+ title="Airflow - Chart",
+ sql=sql,
+ label=chart.label,
+ embed=embed)
+
+ @expose('/dag_stats')
+ #@login_required
+ def dag_stats(self):
+ states = [
+ State.SUCCESS,
+ State.RUNNING,
+ State.FAILED,
+ State.UPSTREAM_FAILED,
+ State.UP_FOR_RETRY,
+ State.QUEUED,
+ ]
+ task_ids = []
+ dag_ids = []
+ for dag in dagbag.dags.values():
+ task_ids += dag.task_ids
+ if not dag.is_subdag:
+ dag_ids.append(dag.dag_id)
+
+ TI = models.TaskInstance
+ DagRun = models.DagRun
+ session = Session()
+
+ LastDagRun = (
+ session.query(DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date'))
+ .group_by(DagRun.dag_id)
+ .subquery('last_dag_run')
+ )
+ RunningDagRun = (
+ session.query(DagRun.dag_id, DagRun.execution_date)
+ .filter(DagRun.state == State.RUNNING)
+ .subquery('running_dag_run')
+ )
+
+ # Select all task_instances from active dag_runs.
+ # If no dag_run is active, return task instances from most recent dag_run.
+ qry = (
+ session.query(TI.dag_id, TI.state, sqla.func.count(TI.task_id))
+ .outerjoin(RunningDagRun, and_(
+ RunningDagRun.c.dag_id == TI.dag_id,
+ RunningDagRun.c.execution_date == TI.execution_date)
+ )
+ .outerjoin(LastDagRun, and_(
+ LastDagRun.c.dag_id == TI.dag_id,
+ LastDagRun.c.execution_date == TI.execution_date)
+ )
+ .filter(TI.task_id.in_(task_ids))
+ .filter(TI.dag_id.in_(dag_ids))
+ .filter(or_(
+ RunningDagRun.c.dag_id != None,
+ LastDagRun.c.dag_id != None
+ ))
+ .group_by(TI.dag_id, TI.state)
+ )
+
+ data = {}
+ for dag_id, state, count in qry:
+ if dag_id not in data:
+ data[dag_id] = {}
+ data[dag_id][state] = count
+ session.commit()
+ session.close()
+
+ payload = {}
+ for dag in dagbag.dags.values():
+ payload[dag.safe_dag_id] = []
+ for state in states:
+ try:
+ count = data[dag.dag_id][state]
+ except:
+ count = 0
+ d = {
+ 'state': state,
+ 'count': count,
+ 'dag_id': dag.dag_id,
+ 'color': State.color(state)
+ }
+ payload[dag.safe_dag_id].append(d)
+ return wwwutils.json_response(payload)
+
+
+ @expose('/code')
+ @login_required
+ def code(self):
+ dag_id = request.args.get('dag_id')
+ dag = dagbag.get_dag(dag_id)
+ title = dag_id
+ try:
+ m = importlib.import_module(dag.module_name)
+ code = inspect.getsource(m)
+ html_code = highlight(
+ code, lexers.PythonLexer(), HtmlFormatter(linenos=True))
+ except IOError as e:
+ html_code = str(e)
+
+ return self.render(
+ 'airflow/dag_code.html', html_code=html_code, dag=dag, title=title,
+ root=request.args.get('root'),
+ demo_mode=conf.getboolean('webserver', 'demo_mode'))
+
+ @expose('/dag_details')
+ @login_required
+ def dag_details(self):
+ dag_id = request.args.get('dag_id')
+ dag = dagbag.get_dag(dag_id)
+ title = "DAG details"
+
+ session = settings.Session()
+ TI = models.TaskInstance
+ states = (
+ session.query(TI.state, sqla.func.count(TI.dag_id))
+ .filter(TI.dag_id == dag_id)
+ .group_by(TI.state)
+ .all()
+ )
+ return self.render(
+ 'airflow/dag_details.html',
+ dag=dag, title=title, states=states, State=State)
+
+ @current_app.errorhandler(404)
+ def circles(self):
+ return render_template(
+ 'airflow/circles.html', hostname=socket.gethostname()), 404
+
+ @current_app.errorhandler(500)
+ def show_traceback(self):
+ from airflow.utils import asciiart as ascii_
+ return render_template(
+ 'airflow/traceback.html',
+ hostname=socket.gethostname(),
+ nukular=ascii_.nukular,
+ info=traceback.format_exc()), 500
+
+ @expose('/sandbox')
+ @login_required
+ def sandbox(self):
+ title = "Sandbox Suggested Configuration"
+ cfg_loc = conf.AIRFLOW_CONFIG + '.sandbox'
+ f = open(cfg_loc, 'r')
+ config = f.read()
+ f.close()
+ code_html = Markup(highlight(
+ config,
+ lexers.IniLexer(), # Lexer call
+ HtmlFormatter(noclasses=True))
+ )
+ return self.render(
+ 'airflow/code.html',
+ code_html=code_html, title=title, subtitle=cfg_loc)
+
+ @expose('/noaccess')
+ def noaccess(self):
+ return self.render('airflow/noaccess.html')
+
+ @expose('/headers')
+ def headers(self):
+ d = {
+ 'headers': {k: v for k, v in request.headers},
+ }
+ if hasattr(current_user, 'is_superuser'):
+ d['is_superuser'] = current_user.is_superuser()
+ d['data_profiling'] = current_user.data_profiling()
+ d['is_anonymous'] = current_user.is_anonymous()
+ d['is_authenticated'] = current_user.is_authenticated()
+ if hasattr(current_user, 'username'):
+ d['username'] = current_user.username
+ return wwwutils.json_response(d)
+
+ @expose('/pickle_info')
+ def pickle_info(self):
+ d = {}
+ dag_id = request.args.get('dag_id')
+ dags = [dagbag.dags.get(dag_id)] if dag_id else dagbag.dags.values()
+ for dag in dags:
+ if not dag.is_subdag:
+ d[dag.dag_id] = dag.pickle_info()
+ return wwwutils.json_response(d)
+
+ @expose('/login', methods=['GET', 'POST'])
+ def login(self):
+ return airflow.login.login(self, request)
+
+ @expose('/logout')
+ def logout(self):
+ logout_user()
+ flash('You have been logged out.')
+ return redirect(url_for('admin.index'))
+
+ @expose('/rendered')
+ @login_required
+ @wwwutils.action_logging
+ def rendered(self):
+ dag_id = request.args.get('dag_id')
+ task_id = request.args.get('task_id')
+ execution_date = request.args.get('execution_date')
+ dttm = dateutil.parser.parse(execution_date)
+ form = DateTimeForm(data={'execution_date': dttm})
+ dag = dagbag.get_dag(dag_id)
+ task = copy.copy(dag.get_task(task_id))
+ ti = models.TaskInstance(task=task, execution_date=dttm)
+ try:
+ ti.render_templates()
+ except Exception as e:
+ flash("Error rendering template: " + str(e), "error")
+ title = "Rendered Template"
+ html_dict = {}
+ for template_field in task.__class__.template_fields:
+ content = getattr(task, template_field)
+ if template_field in attr_renderer:
+ html_dict[template_field] = attr_renderer[template_field](content)
+ else:
+ html_dict[template_field] = (
+ "<pre><code>" + str(content) + "</pre></code>")
+
+ return self.render(
+ 'airflow/ti_code.html',
+ html_dict=html_dict,
+ dag=dag,
+ task_id=task_id,
+ execution_date=execution_date,
+ form=form,
+ title=title,)
+
+ @expose('/log')
+ @login_required
+ @wwwutils.action_logging
+ def log(self):
+ BASE_LOG_FOLDER = os.path.expanduser(
+ conf.get('core', 'BASE_LOG_FOLDER'))
+ dag_id = request.args.get('dag_id')
+ task_id = request.args.get('task_id')
+ execution_date = request.args.get('execution_date')
+ dag = dagbag.get_dag(dag_id)
+ log_relative = "{dag_id}/{task_id}/{execution_date}".format(
+ **locals())
+ loc = os.path.join(BASE_LOG_FOLDER, log_relative)
+ loc = loc.format(**locals())
+ log = ""
+ TI = models.TaskInstance
+ session = Session()
+ dttm = dateutil.parser.parse(execution_date)
+ ti = session.query(TI).filter(
+ TI.dag_id == dag_id, TI.task_id == task_id,
+ TI.execution_date == dttm).first()
+ dttm = dateutil.parser.parse(execution_date)
+ form = DateTimeForm(data={'execution_date': dttm})
+
+ if ti:
+ host = ti.hostname
+ log_loaded = False
+
+ if socket.gethostname() == host:
+ try:
+ f = open(loc)
+ log += "".join(f.readlines())
+ f.close()
+ log_loaded = True
+ except:
+ log = "*** Local log file not found.\n".format(loc)
+ else:
+ WORKER_LOG_SERVER_PORT = \
+ conf.get('celery', 'WORKER_LOG_SERVER_PORT')
+ url = os.path.join(
+ "http://{host}:{WORKER_LOG_SERVER_PORT}/log", log_relative
+ ).format(**locals())
+ log += "*** Log file isn't local.\n"
+ log += "*** Fetching here: {url}\n".format(**locals())
+ try:
+ import requests
+ log += '\n' + requests.get(url).text
+ log_loaded = True
+ except:
+ log += "*** Failed to fetch log file from worker.\n".format(
+ **locals())
+
+ if not log_loaded:
+ # load remote logs
+ remote_log_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
+ remote_log = os.path.join(remote_log_base, log_relative)
+ log += '\n*** Reading remote logs...\n'
+
+ # S3
+ if remote_log.startswith('s3:/'):
+ log += log_utils.S3Log().read(remote_log, return_error=True)
+
+ # GCS
+ elif remote_log.startswith('gs:/'):
+ log += log_utils.GCSLog().read(remote_log, return_error=True)
+
+ # unsupported
+ elif remote_log:
+ log += '*** Unsupported remote log location.'
+
+ session.commit()
+ session.close()
+
+ if PY2 and not isinstance(log, unicode):
+ log = log.decode('utf-8')
+
+ title = "Log"
+
+ return self.render(
+ 'airflow/ti_code.html',
+ code=log, dag=dag, title=title, task_id=task_id,
+ execution_date=execution_date, form=form)
+
+ @expose('/task')
+ @login_required
+ @wwwutils.action_logging
+ def task(self):
+ dag_id = request.args.get('dag_id')
+ task_id = request.args.get('task_id')
+ # Carrying execution_date through, even though it's irrelevant for
+ # this context
+ execution_date = request.args.get('execution_date')
+ dttm = dateutil.parser.parse(execution_date)
+ form = DateTimeForm(data={'execution_date': dttm})
+ dag = dagbag.get_dag(dag_id)
+ if not dag or task_id not in dag.task_ids:
+ flash(
+ "Task [{}.{}] doesn't seem to exist"
+ " at the moment".format(dag_id, task_id),
+ "error")
+ return redirect('/admin/')
+ task = dag.get_task(task_id)
+ task = copy.copy(task)
+ task.resolve_template_files()
+
+ attributes = []
+ for attr_name in dir(task):
+ if not attr_name.startswith('_'):
+ attr = getattr(task, attr_name)
+ if type(attr) != type(self.task) and \
+ attr_name not in attr_renderer:
+ attributes.append((attr_name, str(attr)))
+
+ title = "Task Details"
+ # Color coding the special attributes that are code
+ special_attrs_rendered = {}
+ for attr_name in attr_renderer:
+ if hasattr(task, attr_name):
+ source = getattr(task, attr_name)
+ special_attrs_rendered[attr_name] = attr_renderer[attr_name](source)
+
+ return self.render(
+ 'airflow/task.html',
+ attributes=attributes,
+ task_id=task_id,
+ execution_date=execution_date,
+ special_attrs_rendered=special_attrs_rendered,
+ form=form,
+ dag=dag, title=title)
+
+ @expose('/xcom')
+ @login_required
+ @wwwutils.action_logging
+ def xcom(self):
+ dag_id = request.args.get('dag_id')
+ task_id = request.args.get('task_id')
+ # Carrying execution_date through, even though it's irrelevant for
+ # this context
+ execution_date = request.args.get('execution_date')
+ dttm = dateutil.parser.parse(execution_date)
+ form = DateTimeForm(data={'execution_date': dttm})
+ dag = dagbag.get_dag(dag_id)
+ if not dag or task_id not in dag.task_ids:
+ flash(
+ "Task [{}.{}] doesn't seem to exist"
+ " at the moment".format(dag_id, task_id),
+ "error")
+ return redirect('/admin/')
+
+ session = Session()
+ xcomlist = session.query(XCom).filter(
+ XCom.dag_id == dag_id, XCom.task_id == task_id,
+ XCom.execution_date == dttm).all()
+
+ attributes = []
+ for xcom in xcomlist:
+ if not xcom.key.startswith('_'):
+ attributes.append((xcom.key, xcom.value))
+
+ title = "XCom"
+ return self.render(
+ 'airflow/xcom.html',
+ attributes=attributes,
+ task_id=task_id,
+ execution_date=execution_date,
+ form=form,
+ dag=dag, title=title)\
+
+ @expose('/run')
+ @login_required
+ @wwwutils.action_logging
+ @wwwutils.notify_owner
+ def run(self):
+ dag_id = request.args.get('dag_id')
+ task_id = request.args.get('task_id')
+ origin = request.args.get('origin')
+ dag = dagbag.get_dag(dag_id)
+ task = dag.get_task(task_id)
+
+ execution_date = request.args.get('execution_date')
+ execution_date = dateutil.parser.parse(execution_date)
+ force = request.args.get('force') == "true"
+ deps = request.args.get('deps') == "true"
+
+ try:
+ from airflow.executors import DEFAULT_EXECUTOR as executor
+ from airflow.executors import CeleryExecutor
+ if not isinstance(executor, CeleryExecutor):
+ flash("Only works with the CeleryExecutor, sorry", "error")
+ return redirect(origin)
+ except ImportError:
+ # in case CeleryExecutor cannot be imported it is not active either
+ flash("Only works with the CeleryExecutor, sorry", "error")
+ return redirect(origin)
+
+ ti = models.TaskInstance(task=task, execution_date=execution_date)
+ executor.start()
+ executor.queue_task_instance(
+ ti, force=force, ignore_dependencies=deps)
+ executor.heartbeat()
+ flash(
+ "Sent {} to the message queue, "
+ "it should start any moment now.".format(ti))
+ return redirect(origin)
+
+ @expose('/clear')
+ @login_required
+ @wwwutils.action_logging
+ @wwwutils.notify_owner
+ def clear(self):
+ dag_id = request.args.get('dag_id')
+ task_id = request.args.get('task_id')
+ origin = request.args.get('origin')
+ dag = dagbag.get_dag(dag_id)
+ task = dag.get_task(task_id)
+
+ execution_date = request.args.get('execution_date')
+ execution_date = dateutil.parser.parse(execution_date)
+ confirmed = request.args.get('confirmed') == "true"
+ upstream = request.args.get('upstream') == "true"
+ downstream = request.args.get('downstream') == "true"
+ future = request.args.get('future') == "true"
+ past = request.args.get('past') == "true"
+
+ dag = dag.sub_dag(
+ task_regex=r"^{0}$".format(task_id),
+ include_downstream=downstream,
+ include_upstream=upstream)
+
+ end_date = execution_date if not future else None
+ start_date = execution_date if not past else None
+ if confirmed:
+ count = dag.clear(
+ start_date=start_date,
+ end_date=end_date)
+
+ flash("{0} task instances have been cleared".format(count))
+ return redirect(origin)
+ else:
+ tis = dag.clear(
+ start_date=start_date,
+ end_date=end_date,
+ dry_run=True)
+ if not tis:
+ flash("No task instances to clear", 'error')
+ response = redirect(origin)
+ else:
+ details = "\n".join([str(t) for t in tis])
+
+ response = self.render(
+ 'airflow/confirm.html',
+ message=(
+ "Here's the list of task instances you are about "
+ "to clear:"),
+ details=details,)
+
+ return response
+
+ @expose('/blocked')
+ @login_required
+ def blocked(self):
+ session = settings.Session()
+ DR = models.DagRun
+ dags = (
+ session.query(DR.dag_id, sqla.func.count(DR.id))
+ .filter(DR.state == State.RUNNING)
+ .group_by(DR.dag_id)
+ .all()
+ )
+ payload = []
+ for dag_id, active_dag_runs in dags:
+ max_active_runs = 0
+ if dag_id in dagbag.dags:
+ max_active_runs = dagbag.dags[dag_id].max_active_runs
+ payload.append({
+ 'dag_id': dag_id,
+ 'active_dag_run': active_dag_runs,
+ 'max_active_runs': max_active_runs,
+ })
+ return wwwutils.json_response(payload)
+
+ @expose('/success')
+ @login_required
+ @wwwutils.action_logging
+ @wwwutils.notify_owner
+ def success(self):
+ dag_id = request.args.get('dag_id')
+ task_id = request.args.get('task_id')
+ origin = request.args.get('origin')
+ dag = dagbag.get_dag(dag_id)
+ task = dag.get_task(task_id)
+
+ execution_date = request.args.get('execution_date')
+ execution_date = dateutil.parser.parse(execution_date)
+ confirmed = request.args.get('confirmed') == "true"
+ upstream = request.args.get('upstream') == "true"
+ downstream = request.args.get('downstream') == "true"
+ future = request.args.get('future') == "true"
+ past = request.args.get('past') == "true"
+ MAX_PERIODS = 1000
+
+ # Flagging tasks as successful
+ session = settings.Session()
+ task_ids = [task_id]
+ end_date = ((dag.latest_execution_date or datetime.now())
+ if future else execution_date)
+
+ if 'start_date' in dag.default_args:
+ start_date = dag.default_args['start_date']
+ elif dag.start_date:
+ start_date = dag.start_date
+ else:
+ start_date = execution_date
+
+ start_date = execution_date if not past else start_date
+
+ if downstream:
+ task_ids += [
+ t.task_id
+ for t in task.get_flat_relatives(upstream=False)]
+ if upstream:
+ task_ids += [
+ t.task_id
+ for t in task.get_flat_relatives(upstream=True)]
+ TI = models.TaskInstance
+
+ if dag.schedule_interval == '@once':
+ dates = [start_date]
+ else:
+ dates = dag.date_range(start_date, end_date=end_date)
+
+ tis = session.query(TI).filter(
+ TI.dag_id == dag_id,
+ TI.execution_date.in_(dates),
+ TI.task_id.in_(task_ids)).all()
+ tis_to_change = session.query(TI).filter(
+ TI.dag_id == dag_id,
+ TI.execution_date.in_(dates),
+ TI.task_id.in_(task_ids),
+ TI.state != State.SUCCESS).all()
+ tasks = list(product(task_ids, dates))
+ tis_to_create = list(
+ set(tasks) -
+ set([(ti.task_id, ti.execution_date) for ti in tis]))
+
+ tis_all_altered = list(chain(
+ [(ti.task_id, ti.execution_date) for ti in tis_to_change],
+ tis_to_create))
+
+ if len(tis_all_altered) > MAX_PERIODS:
+ flash("Too many tasks at once (>{0})".format(
+ MAX_PERIODS), 'error')
+ return redirect(origin)
+
+ if confirmed:
+ for ti in tis_to_change:
+ ti.state = State.SUCCESS
+ session.commit()
+
+ for task_id, task_execution_date in tis_to_create:
+ ti = TI(
+ task=dag.get_task(task_id),
+ execution_date=task_execution_date,
+ state=State.SUCCESS)
+ session.add(ti)
+ session.commit()
+
+ session.commit()
+ session.close()
+ flash("Marked success on {} task instances".format(
+ len(tis_all_altered)))
+
+ return redirect(origin)
+ else:
+ if not tis_all_altered:
+ flash("No task instances to mark as successful", 'error')
+ response = redirect(origin)
+ else:
+ tis = []
+ for task_id, task_execution_date in tis_all_altered:
+ tis.append(TI(
+ task=dag.get_task(task_id),
+ execution_date=task_execution_date,
+ state=State.SUCCESS))
+ details = "\n".join([str(t) for t in tis])
+
+ response = self.render(
+ 'airflow/confirm.html',
+ message=(
+ "Here's the list of task instances you are about "
+ "to mark as successful:"),
+ details=details,)
+ return response
+
+ @expose('/tree')
+ @login_required
+ @wwwutils.gzipped
+ @wwwutils.action_logging
+ def tree(self):
+ dag_id = request.args.get('dag_id')
+ blur = conf.getboolean('webserver', 'demo_mode')
+ dag = dagbag.get_dag(dag_id)
+ root = request.args.get('root')
+ if root:
+ dag = dag.sub_dag(
+ task_regex=root,
+ include_downstream=False,
+ include_upstream=True)
+
+ session = settings.Session()
+
+ base_date = request.args.get('base_date')
+ num_runs = request.args.get('num_runs')
+ num_runs = int(num_runs) if num_runs else 25
+
+ if base_date:
+ base_date = dateutil.parser.parse(base_date)
+ else:
+ base_date = dag.latest_execution_date or datetime.now()
+
+ dates = dag.date_range(base_date, num=-abs(num_runs))
+ min_date = dates[0] if dates else datetime(2000, 1, 1)
+
+ DR = models.DagRun
+ dag_runs = (
+ session.query(DR)
+ .filter(
+ DR.dag_id==dag.dag_id,
+ DR.execution_date<=base_date,
+ DR.execution_date>=min_date)
+ .all()
+ )
+ dag_runs = {
+ dr.execution_date: alchemy_to_dict(dr) for dr in dag_runs}
+
+ tis = dag.get_task_instances(
+ session, start_date=min_date, end_date=base_date)
+ dates = sorted(list({ti.execution_date for ti in tis}))
+ max_date = max([ti.execution_date for ti in tis]) if dates else None
+ task_instances = {}
+ for ti in tis:
+ tid = alchemy_to_dict(ti)
+ dr = dag_runs.get(ti.execution_date)
+ tid['external_trigger'] = dr['external_trigger'] if dr else False
+ task_instances[(ti.task_id, ti.execution_date)] = tid
+
+ expanded = []
+ # The default recursion traces every path so that tree view has full
+ # expand/collapse functionality. After 5,000 nodes we stop and fall
+ # back on a quick DFS search for performance. See PR #320.
+ node_count = [0]
+ node_limit = 5000 / max(1, len(dag.roots))
+
+ def recurse_nodes(task, visited):
+ visited.add(task)
+ node_count[0] += 1
+
+ children = [
+ recurse_nodes(t, visited) for t in task.upstream_list
+ if node_count[0] < node_limit or t not in visited]
+
+ # D3 tree uses children vs _children to define what is
+ # expanded or not. The following block makes it such that
+ # repeated nodes are collapsed by default.
+ children_key = 'children'
+ if task.task_id not in expanded:
+ expanded.append(task.task_id)
+ elif children:
+ children_key = "_children"
+
+ return {
+ 'name': task.task_id,
+ 'instances': [
+ task_instances.get((task.task_id, d)) or {
+ 'execution_date': d.isoformat(),
+ 'task_id': task.task_id
+ }
+ for d in dates],
+ children_key: children,
+ 'num_dep': len(task.upstream_list),
+ 'operator': task.task_type,
+ 'retries': task.retries,
+ 'owner': task.owner,
+ 'start_date': task.start_date,
+ 'end_date': task.end_date,
+ 'depends_on_past': task.depends_on_past,
+ 'ui_color': task.ui_color,
+ }
+ data = {
+ 'name': '[DAG]',
+ 'children': [recurse_nodes(t, set()) for t in dag.roots],
+ 'instances': [
+ dag_runs.get(d) or {'execution_date': d.isoformat()}
+ for d in dates],
+ }
+
+ data = json.dumps(data, indent=4, default=json_ser)
+ session.commit()
+ session.close()
+
+ form = DateTimeWithNumRunsForm(data={'base_date': max_date,
+ 'num_runs': num_runs})
+ return self.render(
+ 'airflow/tree.html',
+ operators=sorted(
+ list(set([op.__class__ for op in dag.tasks])),
+ key=lambda x: x.__name__
+ ),
+ root=root,
+ form=form,
+ dag=dag, data=data, blur=blur)
+
+ @expose('/graph')
+ @login_required
+ @wwwutils.gzipped
+ @wwwutils.action_logging
+ def graph(self):
+ session = settings.Session()
+ dag_id = request.args.get('dag_id')
+ blur = conf.getboolean('webserver', 'demo_mode')
+ arrange = request.args.get('arrange', "LR")
+ dag = dagbag.get_dag(dag_id)
+ if dag_id not in dagbag.dags:
+ flash('DAG "{0}" seems to be missing.'.format(dag_id), "error")
+ return redirect('/admin/')
+
+ root = request.args.get('root')
+ if root:
+ dag = dag.sub_dag(
+ task_regex=root,
+ include_upstream=True,
+ include_downstream=False)
+
+ nodes = []
+ edges = []
+ for task in dag.tasks:
+ nodes.append({
+ 'id': task.task_id,
+ 'value': {
+ 'label': task.task_id,
+ 'labelStyle': "fill:{0};".format(task.ui_fgcolor),
+ 'style': "fill:{0};".format(task.ui_color),
+ }
+ })
+
+ def get_upstream(task):
+ for t in task.upstream_list:
+ edge = {
+ 'u': t.task_id,
+ 'v': task.task_id,
+ }
+ if edge not in edges:
+ edges.append(edge)
+ get_upstream(t)
+
+ for t in dag.roots:
+ get_upstream(t)
+
+ dttm = request.args.get('execution_date')
+ if dttm:
+ dttm = dateutil.parser.parse(dttm)
+ else:
+ dttm = dag.latest_execution_date or datetime.now().date()
+
+ DR = models.DagRun
+ drs = (
+ session.query(DR)
+ .filter_by(dag_id=dag_id)
+ .order_by(desc(DR.execution_date)).all()
+ )
+ dr_choices = []
+ dr_state = None
+ for dr in drs:
+ dr_choices.append((dr.execution_date.isoformat(), dr.run_id))
+ if dttm == dr.execution_date:
+ dr_state = dr.state
+
+ class GraphForm(Form):
+ execution_date = SelectField("DAG run", choices=dr_choices)
+ arrange = SelectField("Layout", choices=(
+ ('LR', "Left->Right"),
+ ('RL', "Right->Left"),
+ ('TB', "Top->Bottom"),
+ ('BT', "Bottom->Top"),
+ ))
+ form = GraphForm(
+ data={'execution_date': dttm.isoformat(), 'arrange': arrange})
+
+ task_instances = {
+ ti.task_id: alchemy_to_dict(ti)
+ for ti in dag.get_task_instances(session, dttm, dttm)}
+ tasks = {
+ t.task_id: {
+ 'dag_id': t.dag_id,
+ 'task_type': t.task_type,
+ }
+ for t in dag.tasks}
+ if not tasks:
+ flash("No tasks found", "error")
+ session.commit()
+ session.close()
+ doc_md = markdown.markdown(dag.doc_md) if hasattr(dag, 'doc_md') else ''
+
+ return self.render(
+ 'airflow/graph.html',
+ dag=dag,
+ form=form,
+ width=request.args.get('width', "100%"),
+ height=request.args.get('height', "800"),
+ execution_date=dttm.isoformat(),
+ state_token=state_token(dr_state),
+ doc_md=doc_md,
+ arrange=arrange,
+ operators=sorted(
+ list(set([op.__class__ for op in dag.tasks])),
+ key=lambda x: x.__name__
+ ),
+ blur=blur,
+ root=root or '',
+ task_instances=json.dumps(task_instances, indent=2),
+ tasks=json.dumps(tasks, indent=2),
+ nodes=json.dumps(nodes, indent=2),
+ edges=json.dumps(edges, indent=2),)
+
+ @expose('/duration')
+ @login_required
+ @wwwutils.action_logging
+ def duration(self):
+ from nvd3 import lineChart
+ import time
+ session = settings.Session()
+ dag_id = request.args.get('dag_id')
+ dag = dagbag.get_dag(dag_id)
+ base_date = request.args.get('base_date')
+ num_runs = request.args.get('num_runs')
+ num_runs = int(num_runs) if num_runs else 25
+
+ if base_date:
+ base_date = dateutil.parser.parse(base_date)
+ else:
+ base_date = dag.latest_execution_date or datetime.now()
+
+ dates = dag.date_range(base_date, num=-abs(num_runs))
+ min_date = dates[0] if dates else datetime(2000, 1, 1)
+
+ root = request.args.get('root')
+ if root:
+ dag = dag.sub_dag(
+ task_regex=root,
+ include_upstream=True,
+ include_downstream=False)
+
+ chart = lineChart(name="lineChart", x_is_date=True, height=750, width=600)
+ for task in dag.tasks:
+ y = []
+ x = []
+ for ti in task.get_task_instances(session, start_date=min_date,
+ end_date=base_date):
+ if ti.duration:
+ dttm = int(time.mktime(ti.execution_date.timetuple())) * 1000
+ x.append(dttm)
+ y.append(float(ti.duration) / (60*60))
+ if x:
+ chart.add_serie(name=task.task_id, x=x, y=y)
+
+ tis = dag.get_task_instances(
+ session, start_date=min_date, end_date=base_date)
+ dates = sorted(list({ti.execution_date for ti in tis}))
+ max_date = max([ti.execution_date for ti in tis]) if dates else None
+
+ session.commit()
+ session.close()
+
+ form = DateTimeWithNumRunsForm(data={'base_date': max_date,
+ 'num_runs': num_runs})
+ chart.buildhtml()
+ return self.render(
+ 'airflow/chart.html',
+ dag=dag,
+ demo_mode=conf.getboolean('webserver', 'demo_mode'),
+ root=root,
+ form=form,
+ chart=chart,
+ )
+
+ @expose('/landing_times')
+ @login_required
+ @wwwutils.action_logging
+ def landing_times(self):
+ session = settings.Session()
+ dag_id = request.args.get('dag_id')
+ dag = dagbag.get_dag(dag_id)
+ base_date = request.args.get('base_date')
+ num_runs = request.args.get('num_runs')
+ num_runs = int(num_runs) if num_runs else 25
+
+ if base_date:
+ base_date = dateutil.parser.parse(base_date)
+ else:
+ base_date = dag.latest_execution_date or datetime.now()
+
+ dates = dag.date_range(base_date, num=-abs(num_runs))
+ min_date = dates[0] if dates else datetime(2000, 1, 1)
+
+ root = request.args.get('root')
+ if root:
+ dag = dag.sub_dag(
+ task_regex=root,
+ include_upstream=True,
+ include_downstream=False)
+
+ all_data = []
+ for task in dag.tasks:
+ data = []
+ for ti in task.get_task_instances(session, start_date=min_date,
+ end_date=base_date):
+ if ti.end_date:
+ ts = ti.execution_date
+ if dag.schedule_interval:
+ ts = dag.following_schedule(ts)
+ secs = old_div((ti.end_date - ts).total_seconds(), 60*60)
+ data.append([ti.execution_date.isoformat(), secs])
+ all_data.append({'data': data, 'name': task.task_id})
+
+ tis = dag.get_task_instances(
+ session, start_date=min_date, end_date=base_date)
+ dates = sorted(list({ti.execution_date for ti in tis}))
+ max_date = max([ti.execution_date for ti in tis]) if dates else None
+
+ session.commit()
+ session.close()
+
+ form = DateTimeWithNumRunsForm(data={'base_date': max_date,
+ 'num_runs': num_runs})
+ return self.render(
+ 'airflow/chart.html',
+ dag=dag,
+ data=json.dumps(all_data),
+ height="700px",
+ chart_options={'yAxis': {'title': {'text': 'hours after 00:00'}}},
+ demo_mode=conf.getboolean('webserver', 'demo_mode'),
+ root=root,
+ form=form,
+ )
+
+ @expose('/paused')
+ @login_required
+ @wwwutils.action_logging
+ def paused(self):
+ DagModel = models.DagModel
+ dag_id = request.args.get('dag_id')
+ session = settings.Session()
+ orm_dag = session.query(
+ DagModel).filter(DagModel.dag_id == dag_id).first()
+ if request.args.get('is_paused') == 'false':
+ orm_dag.is_paused = True
+ else:
+ orm_dag.is_paused = False
+ session.merge(orm_dag)
+ session.commit()
+ session.close()
+
+ dagbag.get_dag(dag_id)
+ return "OK"
+
+ @expose('/refresh')
+ @login_required
+ @wwwutils.action_logging
+ def refresh(self):
+ DagModel = models.DagModel
+ dag_id = request.args.get('dag_id')
+ session = settings.Session()
+ orm_dag = session.query(
+ DagModel).filter(DagModel.dag_id == dag_id).first()
+
+ if orm_dag:
+ orm_dag.last_expired = datetime.now()
+ session.merge(orm_dag)
+ session.commit()
+ session.close()
+
+ dagbag.get_dag(dag_id)
+ flash("DAG [{}] is now fresh as a daisy".format(dag_id))
+ return redirect('/')
+
+ @expose('/refresh_all')
+ @login_required
+ @wwwutils.action_logging
+ def refresh_all(self):
+ dagbag.collect_dags(only_if_updated=False)
+ flash("All DAGs are now up to date")
+ return redirect('/')
+
+ @expose('/gantt')
+ @login_required
+ @wwwutils.action_logging
+ def gantt(self):
+
+ session = settings.Session()
+ dag_id = request.args.get('dag_id')
+ dag = dagbag.get_dag(dag_id)
+ demo_mode = conf.getboolean('webserver', 'demo_mode')
+
+ root = request.args.get('root')
+ if root:
+ dag = dag.sub_dag(
+ task_regex=root,
+ include_upstream=True,
+ include_downstream=False)
+
+ dttm = request.args.get('execution_date')
+ if dttm:
+ dttm = dateutil.parser.parse(dttm)
+ else:
+ dttm = dag.latest_execution_date or datetime.now().date()
+
+ form = DateTimeForm(data={'execution_date': dttm})
+
+ tis = [
+ ti
+ for ti in dag.get_task_instances(session, dttm, dttm)
+ if ti.start_date]
+ tis = sorted(tis, key=lambda ti: ti.start_date)
+ tasks = []
+ data = []
+ for i, ti in enumerate(tis):
+ end_date = ti.end_date or datetime.now()
+ tasks += [ti.task_id]
+ color = State.color(ti.state)
+ data.append({
+ 'x': i,
+ 'low': int(ti.start_date.strftime('%s')) * 1000,
+ 'high': int(end_date.strftime('%s')) * 1000,
+ 'color': color,
+ })
+ height = (len(tis) * 25) + 50
+ session.commit()
+ session.close()
+
+ hc = {
+ 'chart': {
+ 'type': 'columnrange',
+ 'inverted': True,
+ 'height': height,
+ },
+ 'xAxis': {'categories': tasks, 'alternateGridColor': '#FAFAFA'},
+ 'yAxis': {'type': 'datetime'},
+ 'title': {
+ 'text': None
+ },
+ 'plotOptions': {
+ 'series': {
+ 'cursor': 'pointer',
+ 'minPointLength': 4,
+ },
+ },
+ 'legend': {
+ 'enabled': False
+ },
+ 'series': [{
+ 'data': data
+ }]
+ }
+ return self.render(
+ 'airflow/gantt.html',
+ dag=dag,
+ execution_date=dttm.isoformat(),
+ form=form,
+ hc=json.dumps(hc, indent=4),
+ height=height,
+ demo_mode=demo_mode,
+ root=root,
+ )
+
+ @expose('/object/task_instances')
+ @login_required
+ @wwwutils.action_logging
+ def task_instances(self):
+ session = settings.Session()
+ dag_id = request.args.get('dag_id')
+ dag = dagbag.get_dag(dag_id)
+
+ dttm = request.args.get('execution_date')
+ if dttm:
+ dttm = dateutil.parser.parse(dttm)
+ else:
+ return ("Error: Invalid execution_date")
+
+ task_instances = {
+ ti.task_id: alchemy_to_dict(ti)
+ for ti in dag.get_task_instances(session, dttm, dttm)}
+
+ return json.dumps(task_instances)
+
+ @expose('/variables/<form>', methods=["GET", "POST"])
+ @login_required
+ @wwwutils.action_logging
+ def variables(self, form):
+ try:
+ if request.method == 'POST':
+ data = request.json
+ if data:
+ session = settings.Session()
+ var = models.Variable(key=form, val=json.dumps(data))
+ session.add(var)
+ session.commit()
+ return ""
+ else:
+ return self.render(
+ 'airflow/variables/{}.html'.format(form)
+ )
+ except:
+ return ("Error: form airflow/variables/{}.html "
+ "not found.").format(form), 404
+
+
+class HomeView(AdminIndexView):
+ @expose("/")
+ @login_required
+ def index(self):
+ session = Session()
+ DM = models.DagModel
+ qry = None
+ # filter the dags if filter_by_owner and current user is not superuser
+ do_filter = FILTER_BY_OWNER and (not current_user.is_superuser())
+ if do_filter:
+ qry = (
+ session.query(DM)
+ .filter(
+ ~DM.is_subdag, DM.is_active,
+ DM.owners == current_user.username)
+ .all()
+ )
+ else:
+ qry = session.query(DM).filter(~DM.is_subdag, DM.is_active).all()
+ orm_dags = {dag.dag_id: dag for dag in qry}
+ import_errors = session.query(models.ImportError).all()
+ for ie in import_errors:
+ flash(
+ "Broken DAG: [{ie.filename}] {ie.stacktrace}".format(ie=ie),
+ "error")
+ session.expunge_all()
+ session.commit()
+ session.close()
+ dags = dagbag.dags.values()
+ if do_filter:
+ dags = {
+ dag.dag_id: dag
+ for dag in dags
+ if (
+ dag.owner == current_user.username and (not dag.parent_dag)
+ )
+ }
+ else:
+ dags = {dag.dag_id: dag for dag in dags if not dag.parent_dag}
+ all_dag_ids = sorted(set(orm_dags.keys()) | set(dags.keys()))
+ return self.render(
+ 'airflow/dags.html',
+ dags=dags,
+ orm_dags=orm_dags,
+ all_dag_ids=all_dag_ids)
+
+
+class QueryView(wwwutils.DataProfilingMixin, BaseView):
+ @expose('/')
+ @wwwutils.gzipped
+ def query(self):
+ session = settings.Session()
+ dbs = session.query(models.Connection).order_by(
+ models.Connection.conn_id).all()
+ session.expunge_all()
+ db_choices = list(
+ ((db.conn_id, db.conn_id) for db in dbs if db.get_hook()))
+ conn_id_str = request.args.get('conn_id')
+ csv = request.args.get('csv') == "true"
+ sql = request.args.get('sql')
+
+ class QueryForm(Form):
+ conn_id = SelectField("Layout", choices=db_choices)
+ sql = TextAreaField("SQL", widget=wwwutils.AceEditorWidget())
+ data = {
+ 'conn_id': conn_id_str,
+ 'sql': sql,
+ }
+ results = None
+ has_data = False
+ error = False
+ if conn_id_str:
+ db = [db for db in dbs if db.conn_id == conn_id_str][0]
+ hook = db.get_hook()
+ try:
+ df = hook.get_pandas_df(wwwutils.limit_sql(sql, QUERY_LIMIT, conn_type=db.conn_type))
+ # df = hook.get_pandas_df(sql)
+ has_data = len(df) > 0
+ df = df.fillna('')
+ results = df.to_html(
+ classes=[
+ 'table', 'table-bordered', 'table-striped', 'no-wrap'],
+ index=False,
+ na_rep='',
+ ) if has_data else ''
+ except Exception as e:
+ flash(str(e), 'error')
+ error = True
+
+ if has_data and len(df) == QUERY_LIMIT:
+ flash(
+ "Query output truncated at " + str(QUERY_LIMIT) +
+ " rows", 'info')
+
+ if not has_data and error:
+ flash('No data', 'error')
+
+ if csv:
+ return Response(
+ response=df.to_csv(index=False),
+ status=200,
+ mimetype="application/text")
+
+ form = QueryForm(request.form, data=data)
+ session.commit()
+ session.close()
+ return self.render(
+ 'airflow/query.html', form=form,
+ title="Ad Hoc Query",
+ results=results or '',
+ has_data=has_data)
+
+
+class AirflowModelView(ModelView):
+ list_template = 'airflow/model_list.html'
+ edit_template = 'airflow/model_edit.html'
+ create_template = 'airflow/model_create.html'
+ column_display_actions = True
+ page_size = 500
+
+
+class ModelViewOnly(wwwutils.LoginMixin, AirflowModelView):
+ """
+ Modifying the base ModelView class for non edit, browse only operations
+ """
+ named_filter_urls = True
+ can_create = False
+ can_edit = False
+ can_delete = False
+ column_display_pk = True
+
+
+class PoolModelView(wwwutils.SuperUserMixin, AirflowModelView):
+ column_list = ('pool', 'slots', 'used_slots', 'queued_slots')
+ column_formatters = dict(
+ pool=pool_link, used_slots=fused_slots, queued_slots=fqueued_slots)
+ named_filter_urls = True
+
+
+class SlaMissModelView(wwwutils.SuperUserMixin, ModelViewOnly):
+ verbose_name_plural = "SLA misses"
+ verbose_name = "SLA miss"
+ column_list = (
+ 'dag_id', 'task_id', 'execution_date', 'email_sent', 'timestamp')
+ column_formatters = dict(
+ task_id=task_instance_link,
+ execution_date=datetime_f,
+ timestamp=datetime_f,
+ dag_id=dag_link)
+ named_filter_urls = True
+ column_searchable_list = ('dag_id', 'task_id',)
+ column_filters = (
+ 'dag_id', 'task_id', 'email_sent', 'timestamp', 'execution_date')
+ form_widget_args = {
+ 'email_sent': {'disabled': True},
+ 'timestamp': {'disabled': True},
+ }
+
+class ChartModelView(wwwutils.DataProfilingMixin, AirflowModelView):
+ verbose_name = "chart"
+ verbose_name_plural = "charts"
+ form_columns = (
+ 'label',
+ 'owner',
+ 'conn_id',
+ 'chart_type',
+ 'show_datatable',
+ 'x_is_date',
+ 'y_log_scale',
+ 'show_sql',
+ 'height',
+ 'sql_layout',
+ 'sql',
+ 'default_params',)
+ column_list = (
+ 'label', 'conn_id', 'chart_type', 'owner', 'last_modified',)
+ column_formatters = dict(label=label_link, last_modified=datetime_f)
+ column_default_sort = ('last_modified', True)
+ create_template = 'airflow/chart/create.html'
+ edit_template = 'airflow/chart/edit.html'
+ column_filters = ('label', 'owner.username', 'conn_id')
+ column_searchable_list = ('owner.username', 'label', 'sql')
+ column_descriptions = {
+ 'label': "Can include {{ templated_fields }} and {{ macros }}",
+ 'chart_type': "The type of chart to be displayed",
+ 'sql': "Can include {{ templated_fields }} and {{ macros }}.",
+ 'height': "Height of the chart, in pixels.",
+ 'conn_id': "Source database to run the query against",
+ 'x_is_date': (
+ "Whether the X axis should be casted as a date field. Expect most "
+ "intelligible date formats to get casted properly."
+ ),
+ 'owner': (
+ "The chart's owner, mostly used for reference and filtering in "
+ "the list view."
+ ),
+ 'show_datatable':
+ "Whether to display an interactive data table under the chart.",
+ 'default_params': (
+ 'A dictionary of {"key": "values",} that define what the '
+ 'templated fields (parameters) values should be by default. '
+ 'To be valid, it needs to "eval" as a Python dict. '
+ 'The key values will show up in the url\'s querystring '
+ 'and can be altered there.'
+ ),
+ 'show_sql': "Whether to display the SQL statement as a collapsible "
+ "section in the chart page.",
+ 'y_log_scale': "Whether to use a log scale for the Y axis.",
+ 'sql_layout': (
+ "Defines the layout of the SQL that the application should "
+ "expect. Depending on the tables you are sourcing from, it may "
+ "make more sense to pivot / unpivot the metrics."
+ ),
+ }
+ column_labels = {
+ 'sql': "SQL",
+ 'height': "Chart Height",
+ 'sql_layout': "SQL Layout",
+ 'show_sql': "Display the SQL Statement",
+ 'default_params': "Default Parameters",
+ }
+ form_choices = {
+ 'chart_type': [
+ ('line', 'Line Chart'),
+ ('spline', 'Spline Chart'),
+ ('bar', 'Bar Chart'),
+ ('para', 'Parallel Coordinates'),
+ ('column', 'Column Chart'),
+ ('area', 'Overlapping Area Chart'),
+ ('stacked_area', 'Stacked Area Chart'),
+ ('percent_area', 'Percent Area Chart'),
+ ('heatmap', 'Heatmap'),
+ ('datatable', 'No chart, data table only'),
+ ],
+ 'sql_layout': [
+ ('series', 'SELECT series, x, y FROM ...'),
+ ('columns', 'SELECT x, y (series 1), y (series 2), ... FROM ...'),
+ ],
+ 'conn_id': [
+ (c.conn_id, c.conn_id)
+ for c in (
+ Session().query(models.Connection.conn_id)
+ .group_by(models.Connection.conn_id)
+ )
+ ]
+ }
+
+ def on_model_change(self, form, model, is_created=True):
+ if model.iteration_no is None:
+ model.iteration_no = 0
+ else:
+ model.iteration_no += 1
+ if not model.user_id and current_user and hasattr(current_user, 'id'):
+ model.user_id = current_user.id
+ model.last_modified = datetime.now()
+
+
+class KnowEventView(wwwutils.DataProfilingMixin, AirflowModelView):
+ verbose_name = "known event"
+ verbose_name_plural = "known events"
+ form_columns = (
+ 'label',
+ 'event_type',
+ 'start_date',
+ 'end_date',
+ 'reported_by',
+ 'description')
+ column_list = (
+ 'label', 'event_type', 'start_date', 'end_date', 'reported_by')
+ column_default_sort = ("start_date", True)
+
+
+class KnowEventTypeView(wwwutils.DataProfilingMixin, AirflowModelView):
+ pass
+
+'''
+# For debugging / troubleshooting
+mv = KnowEventTypeView(
+ models.KnownEventType,
+ Session, name="Known Event Types", category="Manage")
+admin.add_view(mv)
+class DagPickleView(SuperUserMixin, ModelView):
+ pass
+mv = DagPickleView(
+ models.DagPickle,
+ Session, name="Pickles", category="Manage")
+admin.add_view(mv)
+'''
+
+
+class VariableView(wwwutils.LoginMixin, AirflowModelView):
+ verbose_name = "Variable"
+ verbose_name_plural = "Variables"
+ form_columns = (
+ 'key',
+ 'val',
+ )
+ column_list = ('key', 'is_encrypted',)
+ column_filters = ('key', 'val')
+ column_searchable_list = ('key', 'val')
+ form_widget_args = {
+ 'is_encrypted': {'disabled': True},
+ 'val': {
+ 'rows': 20,
+ }
+ }
+
+
+class JobModelView(ModelViewOnly):
+ verbose_name_plural = "jobs"
+ verbose_name = "job"
+ column_default_sort = ('start_date', True)
+ column_filters = (
+ 'job_type', 'dag_id', 'state',
+ 'unixname', 'hostname', 'start_date', 'end_date', 'latest_heartbeat')
+ column_formatters = dict(
+ start_date=datetime_f,
+ end_date=datetime_f,
+ hostname=nobr_f,
+ state=state_f,
+ latest_heartbeat=datetime_f)
+
+
+class DagRunModelView(ModelViewOnly):
+ verbose_name_plural = "DAG Runs"
+ can_delete = True
+ can_edit = True
+ can_create = True
+ column_editable_list = ('state',)
+ verbose_name = "dag run"
+ column_default_sort = ('execution_date', True)
+ form_choices = {
+ 'state': [
+ ('success', 'success'),
+ ('running', 'running'),
+ ('failed', 'failed'),
+ ],
+ }
+ column_list = (
+ 'state', 'dag_id', 'execution_date', 'run_id', 'external_trigger')
+ column_filters = column_list
+ column_searchable_list = ('dag_id', 'state', 'run_id')
+ column_formatters = dict(
+ execution_date=datetime_f,
+ state=state_f,
+ start_date=datetime_f,
+ dag_id=dag_link)
+
+ @action('set_running', "Set state to 'running'", None)
+ def action_set_running(self, ids):
+ self.set_dagrun_state(ids, State.RUNNING)
+
+ @action('set_failed', "Set state to 'failed'", None)
+ def action_set_failed(self, ids):
+ self.set_dagrun_state(ids, State.FAILED)
+
+ @action('set_success', "Set state to 'success'", None)
+ def action_set_success(self, ids):
+ self.set_dagrun_state(ids, State.SUCCESS)
+
+ @provide_session
+ def set_dagrun_state(self, ids, target_state, session=None):
+ try:
+ DR = models.DagRun
+ count = 0
+ for dr in session.query(DR).filter(DR.id.in_(ids)).all():
+ count += 1
+ dr.state = target_state
+ if target_state == State.RUNNING:
+ dr.start_date = datetime.now()
+ else:
+ dr.end_date = datetime.now()
+ session.commit()
+ flash(
+ "{count} dag runs were set to '{target_state}'".format(**locals()))
+ except Exception as ex:
+ if not self.handle_view_exception(ex):
+ raise Exception("Ooops")
+ flash('Failed to set state', 'error')
+
+
+class LogModelView(ModelViewOnly):
+ verbose_name_plural = "logs"
+ verbose_name = "log"
+ column_default_sort = ('dttm', True)
+ column_filters = ('dag_id', 'task_id', 'execution_date')
+ column_formatters = dict(
+ dttm=datetime_f, execution_date=datetime_f, dag_id=dag_link)
+
+
+class TaskInstanceModelView(ModelViewOnly):
+ verbose_name_plural = "task instances"
+ verbose_name = "task instance"
+ column_filters = (
+ 'state', 'dag_id', 'task_id', 'execution_date', 'hostname',
+ 'queue', 'pool', 'operator', 'start_date', 'end_date')
+ named_filter_urls = True
+ column_formatters = dict(
+ log=log_link, task_id=task_instance_link,
+ hostname=nobr_f,
+ state=state_f,
+ execution_date=datetime_f,
+ start_date=datetime_f,
+ end_date=datetime_f,
+ queued_dttm=datetime_f,
+ dag_id=dag_link, duration=duration_f)
+ column_searchable_list = ('dag_id', 'task_id', 'state')
+ column_default_sort = ('start_date', True)
+ form_choices = {
+ 'state': [
+ ('success', 'success'),
+ ('running', 'running'),
+ ('failed', 'failed'),
+ ],
+ }
+ column_list = (
+ 'state', 'dag_id', 'task_id', 'execution_date', 'operator',
+ 'start_date', 'end_date', 'duration', 'job_id', 'hostname',
+ 'unixname', 'priority_weight', 'queue', 'queued_dttm', 'try_number',
+ 'pool', 'log')
+ can_delete = True
+ page_size = 500
+
+ @action('set_running', "Set state to 'running'", None)
+ def action_set_running(self, ids):
+ self.set_task_instance_state(ids, State.RUNNING)
+
+ @action('set_failed', "Set state to 'failed'", None)
+ def action_set_failed(self, ids):
+ self.set_task_instance_state(ids, State.FAILED)
+
+ @action('set_success', "Set state to 'success'", None)
+ def action_set_success(self, ids):
+ self.set_task_instance_state(ids, State.SUCCESS)
+
+ @action('set_retry', "Set state to 'up_for_retry'", None)
+ def action_set_retry(self, ids):
+ self.set_task_instance_state(ids, State.UP_FOR_RETRY)
+
+ @provide_session
+ def set_task_instance_state(self, ids, target_state, session=None):
+ try:
+ TI = models.TaskInstance
+ for count, id in enumerate(ids):
+ task_id, dag_id, execution_date = id.split(',')
+ execution_date = datetime.strptime(execution_date, '%Y-%m-%d %H:%M:%S')
+ ti = session.query(TI).filter(TI.task_id == task_id,
+ TI.dag_id == dag_id,
+ TI.execution_date == execution_date).one()
+ ti.state = target_state
+ count += 1
+ session.commit()
+ flash(
+ "{count} task instances were set to '{target_state}'".format(**locals()))
+ except Exception as ex:
+ if not self.handle_view_exception(ex):
+ raise Exception("Ooops")
+ flash('Failed to set state', 'error')
+
+
+class ConnectionModelView(wwwutils.SuperUserMixin, AirflowModelView):
+ create_template = 'airflow/conn_create.html'
+ edit_template = 'airflow/conn_edit.html'
+ list_template = 'airflow/conn_list.html'
+ form_columns = (
+ 'conn_id',
+ 'conn_type',
+ 'host',
+ 'schema',
+ 'login',
+ 'password',
+ 'port',
+ 'extra',
+ 'extra__jdbc__drv_path',
+ 'extra__jdbc__drv_clsname',
+ 'extra__google_cloud_platform__project',
+ 'extra__google_cloud_platform__key_path',
+ 'extra__google_cloud_platform__service_account',
+ 'extra__google_cloud_platform__scope',
+ )
+ verbose_name = "Connection"
+ verbose_name_plural = "Connections"
+ column_default_sort = ('conn_id', False)
+ column_list = ('conn_id', 'conn_type', 'host', 'port', 'is_encrypted', 'is_extra_encrypted',)
+ form_overrides = dict(_password=PasswordField)
+ form_widget_args = {
+ 'is_extra_encrypted': {'disabled': True},
+ 'is_encrypted': {'disabled': True},
+ }
+ # Used to customized the form, the forms elements get rendered
+ # and results are stored in the extra field as json. All of these
+ # need to be prefixed with extra__ and then the conn_type ___ as in
+ # extra__{conn_type}__name. You can also hide form elements and rename
+ # others from the connection_form.js file
+ form_extra_fields = {
+ 'extra__jdbc__drv_path' : StringField('Driver Path'),
+ 'extra__jdbc__drv_clsname': StringField('Driver Class'),
+ 'extra__google_cloud_platform__project': StringField('Project'),
+ 'extra__google_cloud_platform__key_path': StringField('Keyfile Path'),
+ 'extra__google_cloud_platform__service_account': StringField('Service Account'),
+ 'extra__google_cloud_platform__scope': StringField('Scopes (comma seperated)'),
+
+ }
+ form_choices = {
+ 'conn_type': [
+ ('bigquery', 'BigQuery',),
+ ('datastore', 'Google Datastore'),
+ ('ftp', 'FTP',),
+ ('google_cloud_storage', 'Google Cloud Storage'),
+ ('google_cloud_platform', 'Google Cloud Platform'),
+ ('hdfs', 'HDFS',),
+ ('http', 'HTTP',),
+ ('hive_cli', 'Hive Client Wrapper',),
+ ('hive_metastore', 'Hive Metastore Thrift',),
+ ('hiveserver2', 'Hive Server 2 Thrift',),
+ ('jdbc', 'Jdbc Connection',),
+ ('mysql', 'MySQL',),
+ ('postgres', 'Postgres',),
+ ('oracle', 'Oracle',),
+ ('vertica', 'Vertica',),
+ ('presto', 'Presto',),
+ ('s3', 'S3',),
+ ('samba', 'Samba',),
+ ('sqlite', 'Sqlite',),
+ ('ssh', 'SSH',),
+ ('cloudant', 'IBM Cloudant',),
+ ('mssql', 'Microsoft SQL Server'),
+ ('mesos_framework-id', 'Mesos Framework ID'),
+ ]
+ }
+
+ def on_model_change(self, form, model, is_created):
+ formdata = form.data
+ if formdata['conn_type'] in ['jdbc', 'google_cloud_platform']:
+ extra = {
+ key:formdata[key]
+ for key in self.form_extra_fields.keys() if key in formdata}
+ model.extra = json.dumps(extra)
+
+ @classmethod
+ def alert_fernet_key(cls):
+ return conf.get('core', 'fernet_key') is None
+
+ @classmethod
+ def is_secure(self):
+ """
+ Used to display a message in the Connection list view making it clear
+ that the passwords and `extra` field can't be encrypted.
+ """
+ is_secure = False
+ try:
+ import cryptography
+ conf.get('core', 'fernet_key')
+ is_secure = True
+ except:
+ pass
+ return is_secure
+
+ def on_form_prefill(self, form, id):
+ try:
+ d = json.loads(form.data.get('extra', '{}'))
+ except Exception as e:
+ d = {}
+
+ for field in list(self.form_extra_fields.keys()):
+ value = d.get(field, '')
+ if value:
+ field = getattr(form, field)
+ field.data = value
+
+
+class UserModelView(wwwutils.SuperUserMixin, AirflowModelView):
+ verbose_name = "User"
+ verbose_name_plural = "Users"
+ column_default_sort = 'username'
+
+
+class ConfigurationView(wwwutils.SuperUserMixin, BaseView):
+ @expose('/')
+ def conf(self):
+ raw = request.args.get('raw') == "true"
+ title = "Airflow Configuration"
+ subtitle = conf.AIRFLOW_CONFIG
+ if conf.getboolean("webserver", "expose_config"):
+ with open(conf.AIRFLOW_CONFIG, 'r') as f:
+ config = f.read()
+ else:
+ config = (
+ "# You Airflow administrator chose not to expose the "
+ "configuration, most likely for security reasons.")
+ if raw:
+ return Response(
+ response=config,
+ status=200,
+ mimetype="application/text")
+ else:
+ code_html = Markup(highlight(
+ config,
+ lexers.IniLexer(), # Lexer call
+ HtmlFormatter(noclasses=True))
+ )
+ return self.render(
+ 'airflow/code.html',
+ pre_subtitle=settings.HEADER + " v" + airflow.__version__,
+ code_html=code_html, title=title, subtitle=subtitle)
+
+
+class DagModelView(wwwutils.SuperUserMixin, ModelView):
+ column_list = ('dag_id', 'owners')
+ column_editable_list = ('is_paused',)
+ form_excluded_columns = ('is_subdag', 'is_active')
+ column_searchable_list = ('dag_id',)
+ column_filters = (
+ 'dag_id', 'owners', 'is_paused', 'is_active', 'is_subdag',
+ 'last_scheduler_run', 'last_expired')
+ form_widget_args = {
+ 'last_scheduler_run': {'disabled': True},
+ 'fileloc': {'disabled': True},
+ 'is_paused': {'disabled': True},
+ 'last_pickled': {'disabled': True},
+ 'pickle_id': {'disabled': True},
+ 'last_loaded': {'disabled': True},
+ 'last_expired': {'disabled': True},
+ 'pickle_size': {'disabled': True},
+ 'scheduler_lock': {'disabled': True},
+ 'owners': {'disabled': True},
+ }
+ column_formatters = dict(
+ dag_id=dag_link,
+ )
+ can_delete = False
+ can_create = False
+ page_size = 50
+ list_template = 'airflow/list_dags.html'
+ named_filter_urls = True
+
+ def get_query(self):
+ """
+ Default filters for model
+ """
+ return (
+ super(DagModelView, self)
+ .get_query()
+ .filter(or_(models.DagModel.is_active, models.DagModel.is_paused))
+ .filter(~models.DagModel.is_subdag)
+ )
+
+ def get_count_query(self):
+ """
+ Default filters for model
+ """
+ return (
+ super(DagModelView, self)
+ .get_count_query()
+ .filter(models.DagModel.is_active)
+ .filter(~models.DagModel.is_subdag)
+ )
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0a460081/airflow/bin/airflow
----------------------------------------------------------------------
diff --git a/airflow/bin/airflow b/airflow/bin/airflow
index 80f1135..0598596 100755
--- a/airflow/bin/airflow
+++ b/airflow/bin/airflow
@@ -1,4 +1,17 @@
#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
import logging
import os
from airflow import configuration
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0a460081/airflow/executors/base_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py
index 2e88fa9..ca63443 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -1,3 +1,17 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
from builtins import range
from airflow import configuration
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0a460081/airflow/executors/celery_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index de56baf..04414fb 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -1,3 +1,17 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
from builtins import object
import logging
import subprocess