You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2016/06/21 06:48:37 UTC
incubator-airflow git commit: [AIRFLOW-263] Remove temp backtick file
Repository: incubator-airflow
Updated Branches:
refs/heads/master 7992036f5 -> f4c44146c
[AIRFLOW-263] Remove temp backtick file
Closes #1613 from artwr/artwr_remove_backtick_file
[AIRFLOW-263] Remove temp backtick file
This removes the file erroneously introduced by the highcharts
refactor.
Solves AIRFLOW-263
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f4c44146
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f4c44146
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f4c44146
Branch: refs/heads/master
Commit: f4c44146cd38afe21c517230bff71d33ab041246
Parents: 7992036
Author: Arthur Wiedmer <ar...@gmail.com>
Authored: Tue Jun 21 08:48:23 2016 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Jun 21 08:48:23 2016 +0200
----------------------------------------------------------------------
` | 2347 ----------------------------------------------------------------
1 file changed, 2347 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f4c44146/`
----------------------------------------------------------------------
diff --git a/` b/`
deleted file mode 100644
index 6331805..0000000
--- a/`
+++ /dev/null
@@ -1,2347 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-import sys
-
-import os
-import socket
-import importlib
-
-from functools import wraps
-from datetime import datetime, timedelta
-import dateutil.parser
-import copy
-from itertools import chain, product
-
-from past.utils import old_div
-from past.builtins import basestring
-
-import inspect
-import traceback
-
-import sqlalchemy as sqla
-from sqlalchemy import or_, desc, and_
-
-
-from flask import redirect, url_for, request, Markup, Response, current_app, render_template
-from flask_admin import BaseView, expose, AdminIndexView
-from flask_admin.contrib.sqla import ModelView
-from flask_admin.actions import action
-from flask_login import flash
-from flask._compat import PY2
-
-import jinja2
-import markdown
-import json
-
-from wtforms import (
- Form, SelectField, TextAreaField, PasswordField, StringField)
-
-from pygments import highlight, lexers
-from pygments.formatters import HtmlFormatter
-
-import airflow
-from airflow import configuration as conf
-from airflow import models
-from airflow import settings
-from airflow.exceptions import AirflowException
-from airflow.settings import Session
-from airflow.models import XCom
-
-from airflow.utils.json import json_ser
-from airflow.utils.state import State
-from airflow.utils.db import provide_session
-from airflow.utils.helpers import alchemy_to_dict
-from airflow.utils import logging as log_utils
-from airflow.www import utils as wwwutils
-from airflow.www.forms import DateTimeForm, DateTimeWithNumRunsForm
-
-QUERY_LIMIT = 100000
-CHART_LIMIT = 200000
-
-dagbag = models.DagBag(os.path.expanduser(conf.get('core', 'DAGS_FOLDER')))
-
-login_required = airflow.login.login_required
-current_user = airflow.login.current_user
-logout_user = airflow.login.logout_user
-
-FILTER_BY_OWNER = False
-if conf.getboolean('webserver', 'FILTER_BY_OWNER'):
- # filter_by_owner if authentication is enabled and filter_by_owner is true
- FILTER_BY_OWNER = not current_app.config['LOGIN_DISABLED']
-
-
-def dag_link(v, c, m, p):
- url = url_for(
- 'airflow.graph',
- dag_id=m.dag_id)
- return Markup(
- '<a href="{url}">{m.dag_id}</a>'.format(**locals()))
-
-
-def log_link(v, c, m, p):
- url = url_for(
- 'airflow.log',
- dag_id=m.dag_id,
- task_id=m.task_id,
- execution_date=m.execution_date.isoformat())
- return Markup(
- '<a href="{url}">'
- ' <span class="glyphicon glyphicon-book" aria-hidden="true">'
- '</span></a>').format(**locals())
-
-
-def task_instance_link(v, c, m, p):
- url = url_for(
- 'airflow.task',
- dag_id=m.dag_id,
- task_id=m.task_id,
- execution_date=m.execution_date.isoformat())
- url_root = url_for(
- 'airflow.graph',
- dag_id=m.dag_id,
- root=m.task_id,
- execution_date=m.execution_date.isoformat())
- return Markup(
- """
- <span style="white-space: nowrap;">
- <a href="{url}">{m.task_id}</a>
- <a href="{url_root}" title="Filter on this task and upstream">
- <span class="glyphicon glyphicon-filter" style="margin-left: 0px;"
- aria-hidden="true"></span>
- </a>
- </span>
- """.format(**locals()))
-
-
-def state_token(state):
- color = State.color(state)
- return Markup(
- '<span class="label" style="background-color:{color};">'
- '{state}</span>'.format(**locals()))
-
-
-def state_f(v, c, m, p):
- return state_token(m.state)
-
-
-def duration_f(v, c, m, p):
- if m.end_date and m.duration:
- return timedelta(seconds=m.duration)
-
-
-def datetime_f(v, c, m, p):
- attr = getattr(m, p)
- dttm = attr.isoformat() if attr else ''
- if datetime.now().isoformat()[:4] == dttm[:4]:
- dttm = dttm[5:]
- return Markup("<nobr>{}</nobr>".format(dttm))
-
-
-def nobr_f(v, c, m, p):
- return Markup("<nobr>{}</nobr>".format(getattr(m, p)))
-
-
-def label_link(v, c, m, p):
- try:
- default_params = eval(m.default_params)
- except:
- default_params = {}
- url = url_for(
- 'airflow.chart', chart_id=m.id, iteration_no=m.iteration_no,
- **default_params)
- return Markup("<a href='{url}'>{m.label}</a>".format(**locals()))
-
-
-def pool_link(v, c, m, p):
- url = '/admin/taskinstance/?flt1_pool_equals=' + m.pool
- return Markup("<a href='{url}'>{m.pool}</a>".format(**locals()))
-
-
-def pygment_html_render(s, lexer=lexers.TextLexer):
- return highlight(
- s,
- lexer(),
- HtmlFormatter(linenos=True),
- )
-
-
-def render(obj, lexer):
- out = ""
- if isinstance(obj, basestring):
- out += pygment_html_render(obj, lexer)
- elif isinstance(obj, (tuple, list)):
- for i, s in enumerate(obj):
- out += "<div>List item #{}</div>".format(i)
- out += "<div>" + pygment_html_render(s, lexer) + "</div>"
- elif isinstance(obj, dict):
- for k, v in obj.items():
- out += '<div>Dict item "{}"</div>'.format(k)
- out += "<div>" + pygment_html_render(v, lexer) + "</div>"
- return out
-
-
-def wrapped_markdown(s):
- return '<div class="rich_doc">' + markdown.markdown(s) + "</div>"
-
-
-attr_renderer = {
- 'bash_command': lambda x: render(x, lexers.BashLexer),
- 'hql': lambda x: render(x, lexers.SqlLexer),
- 'sql': lambda x: render(x, lexers.SqlLexer),
- 'doc': lambda x: render(x, lexers.TextLexer),
- 'doc_json': lambda x: render(x, lexers.JsonLexer),
- 'doc_rst': lambda x: render(x, lexers.RstLexer),
- 'doc_yaml': lambda x: render(x, lexers.YamlLexer),
- 'doc_md': wrapped_markdown,
- 'python_callable': lambda x: render(
- inspect.getsource(x), lexers.PythonLexer),
-}
-
-
-def data_profiling_required(f):
- '''
- Decorator for views requiring data profiling access
- '''
- @wraps(f)
- def decorated_function(*args, **kwargs):
- if (
- current_app.config['LOGIN_DISABLED'] or
- (not current_user.is_anonymous() and current_user.data_profiling())
- ):
- return f(*args, **kwargs)
- else:
- flash("This page requires data profiling privileges", "error")
- return redirect(url_for('admin.index'))
- return decorated_function
-
-
-def fused_slots(v, c, m, p):
- url = (
- '/admin/taskinstance/' +
- '?flt1_pool_equals=' + m.pool +
- '&flt2_state_equals=running')
- return Markup("<a href='{0}'>{1}</a>".format(url, m.used_slots()))
-
-
-def fqueued_slots(v, c, m, p):
- url = (
- '/admin/taskinstance/' +
- '?flt1_pool_equals=' + m.pool +
- '&flt2_state_equals=queued&sort=10&desc=1')
- return Markup("<a href='{0}'>{1}</a>".format(url, m.queued_slots()))
-
-
-class Airflow(BaseView):
-
- def is_visible(self):
- return False
-
- @expose('/')
- @login_required
- def index(self):
- return self.render('airflow/dags.html')
-
- @expose('/chart_data')
- @data_profiling_required
- @wwwutils.gzipped
- # @cache.cached(timeout=3600, key_prefix=wwwutils.make_cache_key)
- def chart_data(self):
- session = settings.Session()
- chart_id = request.args.get('chart_id')
- csv = request.args.get('csv') == "true"
- chart = session.query(models.Chart).filter_by(id=chart_id).first()
- db = session.query(
- models.Connection).filter_by(conn_id=chart.conn_id).first()
- session.expunge_all()
- session.commit()
- session.close()
-
- payload = {}
- payload['state'] = 'ERROR'
- payload['error'] = ''
-
- # Processing templated fields
- try:
- args = eval(chart.default_params)
- if type(args) is not type(dict()):
- raise AirflowException('Not a dict')
- except:
- args = {}
- payload['error'] += (
- "Default params is not valid, string has to evaluate as "
- "a Python dictionary. ")
-
- request_dict = {k: request.args.get(k) for k in request.args}
- from airflow import macros
- args.update(request_dict)
- args['macros'] = macros
- sql = jinja2.Template(chart.sql).render(**args)
- label = jinja2.Template(chart.label).render(**args)
- payload['sql_html'] = Markup(highlight(
- sql,
- lexers.SqlLexer(), # Lexer call
- HtmlFormatter(noclasses=True))
- )
- payload['label'] = label
-
- import pandas as pd
- pd.set_option('display.max_colwidth', 100)
- hook = db.get_hook()
- try:
- df = hook.get_pandas_df(wwwutils.limit_sql(sql, CHART_LIMIT, conn_type=db.conn_type))
- df = df.fillna(0)
- except Exception as e:
- payload['error'] += "SQL execution failed. Details: " + str(e)
-
- if csv:
- return Response(
- response=df.to_csv(index=False),
- status=200,
- mimetype="application/text")
-
- if not payload['error'] and len(df) == CHART_LIMIT:
- payload['warning'] = (
- "Data has been truncated to {0}"
- " rows. Expect incomplete results.").format(CHART_LIMIT)
-
- if not payload['error'] and len(df) == 0:
- payload['error'] += "Empty result set. "
- elif (
- not payload['error'] and
- chart.sql_layout == 'series' and
- chart.chart_type != "datatable" and
- len(df.columns) < 3):
- payload['error'] += "SQL needs to return at least 3 columns. "
- elif (
- not payload['error'] and
- chart.sql_layout == 'columns'and
- len(df.columns) < 2):
- payload['error'] += "SQL needs to return at least 2 columns. "
- elif not payload['error']:
- import numpy as np
- chart_type = chart.chart_type
-
- data = None
- if chart_type == "datatable":
- chart.show_datatable = True
- if chart.show_datatable:
- data = df.to_dict(orient="split")
- data['columns'] = [{'title': c} for c in data['columns']]
-
- # Trying to convert time to something Highcharts likes
- x_col = 1 if chart.sql_layout == 'series' else 0
- if chart.x_is_date:
- try:
- # From string to datetime
- df[df.columns[x_col]] = pd.to_datetime(
- df[df.columns[x_col]])
- except Exception as e:
- raise AirflowException(str(e))
- df[df.columns[x_col]] = df[df.columns[x_col]].apply(
- lambda x: int(x.strftime("%s")) * 1000)
-
- series = []
- colorAxis = None
- if chart_type == 'datatable':
- payload['data'] = data
- payload['state'] = 'SUCCESS'
- return wwwutils.json_response(payload)
-
- elif chart_type == 'para':
- df.rename(columns={
- df.columns[0]: 'name',
- df.columns[1]: 'group',
- }, inplace=True)
- return Response(
- response=df.to_csv(index=False),
- status=200,
- mimetype="application/text")
-
- elif chart_type == 'heatmap':
- color_perc_lbound = float(
- request.args.get('color_perc_lbound', 0))
- color_perc_rbound = float(
- request.args.get('color_perc_rbound', 1))
- color_scheme = request.args.get('color_scheme', 'blue_red')
-
- if color_scheme == 'blue_red':
- stops = [
- [color_perc_lbound, '#00D1C1'],
- [
- color_perc_lbound +
- ((color_perc_rbound - color_perc_lbound)/2),
- '#FFFFCC'
- ],
- [color_perc_rbound, '#FF5A5F']
- ]
- elif color_scheme == 'blue_scale':
- stops = [
- [color_perc_lbound, '#FFFFFF'],
- [color_perc_rbound, '#2222FF']
- ]
- elif color_scheme == 'fire':
- diff = float(color_perc_rbound - color_perc_lbound)
- stops = [
- [color_perc_lbound, '#FFFFFF'],
- [color_perc_lbound + 0.33*diff, '#FFFF00'],
- [color_perc_lbound + 0.66*diff, '#FF0000'],
- [color_perc_rbound, '#000000']
- ]
- else:
- stops = [
- [color_perc_lbound, '#FFFFFF'],
- [
- color_perc_lbound +
- ((color_perc_rbound - color_perc_lbound)/2),
- '#888888'
- ],
- [color_perc_rbound, '#000000'],
- ]
-
- xaxis_label = df.columns[1]
- yaxis_label = df.columns[2]
- data = []
- for row in df.itertuples():
- data.append({
- 'x': row[2],
- 'y': row[3],
- 'value': row[4],
- })
- x_format = '{point.x:%Y-%m-%d}' \
- if chart.x_is_date else '{point.x}'
- series.append({
- 'data': data,
- 'borderWidth': 0,
- 'colsize': 24 * 36e5,
- 'turboThreshold': sys.float_info.max,
- 'tooltip': {
- 'headerFormat': '',
- 'pointFormat': (
- df.columns[1] + ': ' + x_format + '<br/>' +
- df.columns[2] + ': {point.y}<br/>' +
- df.columns[3] + ': <b>{point.value}</b>'
- ),
- },
- })
- colorAxis = {
- 'stops': stops,
- 'minColor': '#FFFFFF',
- 'maxColor': '#000000',
- 'min': 50,
- 'max': 2200,
- }
- else:
- if chart.sql_layout == 'series':
- # User provides columns (series, x, y)
- xaxis_label = df.columns[1]
- yaxis_label = df.columns[2]
- df[df.columns[2]] = df[df.columns[2]].astype(np.float)
- df = df.pivot_table(
- index=df.columns[1],
- columns=df.columns[0],
- values=df.columns[2], aggfunc=np.sum)
- else:
- # User provides columns (x, y, metric1, metric2, ...)
- xaxis_label = df.columns[0]
- yaxis_label = 'y'
- df.index = df[df.columns[0]]
- df = df.sort(df.columns[0])
- del df[df.columns[0]]
- for col in df.columns:
- df[col] = df[col].astype(np.float)
-
- for col in df.columns:
- series.append({
- 'name': col,
- 'data': [
- (k, df[col][k])
- for k in df[col].keys()
- if not np.isnan(df[col][k])]
- })
- series = [serie for serie in sorted(
- series, key=lambda s: s['data'][0][1], reverse=True)]
-
- if chart_type == "stacked_area":
- stacking = "normal"
- chart_type = 'area'
- elif chart_type == "percent_area":
- stacking = "percent"
- chart_type = 'area'
- else:
- stacking = None
- hc = {
- 'chart': {
- 'type': chart_type
- },
- 'plotOptions': {
- 'series': {
- 'marker': {
- 'enabled': False
- }
- },
- 'area': {'stacking': stacking},
- },
- 'title': {'text': ''},
- 'xAxis': {
- 'title': {'text': xaxis_label},
- 'type': 'datetime' if chart.x_is_date else None,
- },
- 'yAxis': {
- 'title': {'text': yaxis_label},
- },
- 'colorAxis': colorAxis,
- 'tooltip': {
- 'useHTML': True,
- 'backgroundColor': None,
- 'borderWidth': 0,
- },
- 'series': series,
- }
-
- if chart.y_log_scale:
- hc['yAxis']['type'] = 'logarithmic'
- hc['yAxis']['minorTickInterval'] = 0.1
- if 'min' in hc['yAxis']:
- del hc['yAxis']['min']
-
- payload['state'] = 'SUCCESS'
- payload['hc'] = hc
- payload['data'] = data
- payload['request_dict'] = request_dict
- return wwwutils.json_response(payload)
-
- @expose('/chart')
- @data_profiling_required
- def chart(self):
- session = settings.Session()
- chart_id = request.args.get('chart_id')
- embed = request.args.get('embed')
- chart = session.query(models.Chart).filter_by(id=chart_id).first()
- session.expunge_all()
- session.commit()
- session.close()
- if chart.chart_type == 'para':
- return self.render('airflow/para/para.html', chart=chart)
-
- sql = ""
- if chart.show_sql:
- sql = Markup(highlight(
- chart.sql,
- lexers.SqlLexer(), # Lexer call
- HtmlFormatter(noclasses=True))
- )
- return self.render(
- 'airflow/highchart.html',
- chart=chart,
- title="Airflow - Chart",
- sql=sql,
- label=chart.label,
- embed=embed)
-
- @expose('/dag_stats')
- #@login_required
- def dag_stats(self):
- states = [
- State.SUCCESS,
- State.RUNNING,
- State.FAILED,
- State.UPSTREAM_FAILED,
- State.UP_FOR_RETRY,
- State.QUEUED,
- ]
- task_ids = []
- dag_ids = []
- for dag in dagbag.dags.values():
- task_ids += dag.task_ids
- if not dag.is_subdag:
- dag_ids.append(dag.dag_id)
-
- TI = models.TaskInstance
- DagRun = models.DagRun
- session = Session()
-
- LastDagRun = (
- session.query(DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date'))
- .group_by(DagRun.dag_id)
- .subquery('last_dag_run')
- )
- RunningDagRun = (
- session.query(DagRun.dag_id, DagRun.execution_date)
- .filter(DagRun.state == State.RUNNING)
- .subquery('running_dag_run')
- )
-
- # Select all task_instances from active dag_runs.
- # If no dag_run is active, return task instances from most recent dag_run.
- qry = (
- session.query(TI.dag_id, TI.state, sqla.func.count(TI.task_id))
- .outerjoin(RunningDagRun, and_(
- RunningDagRun.c.dag_id == TI.dag_id,
- RunningDagRun.c.execution_date == TI.execution_date)
- )
- .outerjoin(LastDagRun, and_(
- LastDagRun.c.dag_id == TI.dag_id,
- LastDagRun.c.execution_date == TI.execution_date)
- )
- .filter(TI.task_id.in_(task_ids))
- .filter(TI.dag_id.in_(dag_ids))
- .filter(or_(
- RunningDagRun.c.dag_id != None,
- LastDagRun.c.dag_id != None
- ))
- .group_by(TI.dag_id, TI.state)
- )
-
- data = {}
- for dag_id, state, count in qry:
- if dag_id not in data:
- data[dag_id] = {}
- data[dag_id][state] = count
- session.commit()
- session.close()
-
- payload = {}
- for dag in dagbag.dags.values():
- payload[dag.safe_dag_id] = []
- for state in states:
- try:
- count = data[dag.dag_id][state]
- except:
- count = 0
- d = {
- 'state': state,
- 'count': count,
- 'dag_id': dag.dag_id,
- 'color': State.color(state)
- }
- payload[dag.safe_dag_id].append(d)
- return wwwutils.json_response(payload)
-
-
- @expose('/code')
- @login_required
- def code(self):
- dag_id = request.args.get('dag_id')
- dag = dagbag.get_dag(dag_id)
- title = dag_id
- try:
- m = importlib.import_module(dag.module_name)
- code = inspect.getsource(m)
- html_code = highlight(
- code, lexers.PythonLexer(), HtmlFormatter(linenos=True))
- except IOError as e:
- html_code = str(e)
-
- return self.render(
- 'airflow/dag_code.html', html_code=html_code, dag=dag, title=title,
- root=request.args.get('root'),
- demo_mode=conf.getboolean('webserver', 'demo_mode'))
-
- @expose('/dag_details')
- @login_required
- def dag_details(self):
- dag_id = request.args.get('dag_id')
- dag = dagbag.get_dag(dag_id)
- title = "DAG details"
-
- session = settings.Session()
- TI = models.TaskInstance
- states = (
- session.query(TI.state, sqla.func.count(TI.dag_id))
- .filter(TI.dag_id == dag_id)
- .group_by(TI.state)
- .all()
- )
- return self.render(
- 'airflow/dag_details.html',
- dag=dag, title=title, states=states, State=State)
-
- @current_app.errorhandler(404)
- def circles(self):
- return render_template(
- 'airflow/circles.html', hostname=socket.gethostname()), 404
-
- @current_app.errorhandler(500)
- def show_traceback(self):
- from airflow.utils import asciiart as ascii_
- return render_template(
- 'airflow/traceback.html',
- hostname=socket.gethostname(),
- nukular=ascii_.nukular,
- info=traceback.format_exc()), 500
-
- @expose('/sandbox')
- @login_required
- def sandbox(self):
- title = "Sandbox Suggested Configuration"
- cfg_loc = conf.AIRFLOW_CONFIG + '.sandbox'
- f = open(cfg_loc, 'r')
- config = f.read()
- f.close()
- code_html = Markup(highlight(
- config,
- lexers.IniLexer(), # Lexer call
- HtmlFormatter(noclasses=True))
- )
- return self.render(
- 'airflow/code.html',
- code_html=code_html, title=title, subtitle=cfg_loc)
-
- @expose('/noaccess')
- def noaccess(self):
- return self.render('airflow/noaccess.html')
-
- @expose('/headers')
- def headers(self):
- d = {
- 'headers': {k: v for k, v in request.headers},
- }
- if hasattr(current_user, 'is_superuser'):
- d['is_superuser'] = current_user.is_superuser()
- d['data_profiling'] = current_user.data_profiling()
- d['is_anonymous'] = current_user.is_anonymous()
- d['is_authenticated'] = current_user.is_authenticated()
- if hasattr(current_user, 'username'):
- d['username'] = current_user.username
- return wwwutils.json_response(d)
-
- @expose('/pickle_info')
- def pickle_info(self):
- d = {}
- dag_id = request.args.get('dag_id')
- dags = [dagbag.dags.get(dag_id)] if dag_id else dagbag.dags.values()
- for dag in dags:
- if not dag.is_subdag:
- d[dag.dag_id] = dag.pickle_info()
- return wwwutils.json_response(d)
-
- @expose('/login', methods=['GET', 'POST'])
- def login(self):
- return airflow.login.login(self, request)
-
- @expose('/logout')
- def logout(self):
- logout_user()
- flash('You have been logged out.')
- return redirect(url_for('admin.index'))
-
- @expose('/rendered')
- @login_required
- @wwwutils.action_logging
- def rendered(self):
- dag_id = request.args.get('dag_id')
- task_id = request.args.get('task_id')
- execution_date = request.args.get('execution_date')
- dttm = dateutil.parser.parse(execution_date)
- form = DateTimeForm(data={'execution_date': dttm})
- dag = dagbag.get_dag(dag_id)
- task = copy.copy(dag.get_task(task_id))
- ti = models.TaskInstance(task=task, execution_date=dttm)
- try:
- ti.render_templates()
- except Exception as e:
- flash("Error rendering template: " + str(e), "error")
- title = "Rendered Template"
- html_dict = {}
- for template_field in task.__class__.template_fields:
- content = getattr(task, template_field)
- if template_field in attr_renderer:
- html_dict[template_field] = attr_renderer[template_field](content)
- else:
- html_dict[template_field] = (
- "<pre><code>" + str(content) + "</pre></code>")
-
- return self.render(
- 'airflow/ti_code.html',
- html_dict=html_dict,
- dag=dag,
- task_id=task_id,
- execution_date=execution_date,
- form=form,
- title=title,)
-
- @expose('/log')
- @login_required
- @wwwutils.action_logging
- def log(self):
- BASE_LOG_FOLDER = os.path.expanduser(
- conf.get('core', 'BASE_LOG_FOLDER'))
- dag_id = request.args.get('dag_id')
- task_id = request.args.get('task_id')
- execution_date = request.args.get('execution_date')
- dag = dagbag.get_dag(dag_id)
- log_relative = "{dag_id}/{task_id}/{execution_date}".format(
- **locals())
- loc = os.path.join(BASE_LOG_FOLDER, log_relative)
- loc = loc.format(**locals())
- log = ""
- TI = models.TaskInstance
- session = Session()
- dttm = dateutil.parser.parse(execution_date)
- ti = session.query(TI).filter(
- TI.dag_id == dag_id, TI.task_id == task_id,
- TI.execution_date == dttm).first()
- dttm = dateutil.parser.parse(execution_date)
- form = DateTimeForm(data={'execution_date': dttm})
-
- if ti:
- host = ti.hostname
- log_loaded = False
-
- if socket.gethostname() == host:
- try:
- f = open(loc)
- log += "".join(f.readlines())
- f.close()
- log_loaded = True
- except:
- log = "*** Local log file not found.\n".format(loc)
- else:
- WORKER_LOG_SERVER_PORT = \
- conf.get('celery', 'WORKER_LOG_SERVER_PORT')
- url = os.path.join(
- "http://{host}:{WORKER_LOG_SERVER_PORT}/log", log_relative
- ).format(**locals())
- log += "*** Log file isn't local.\n"
- log += "*** Fetching here: {url}\n".format(**locals())
- try:
- import requests
- log += '\n' + requests.get(url).text
- log_loaded = True
- except:
- log += "*** Failed to fetch log file from worker.\n".format(
- **locals())
-
- if not log_loaded:
- # load remote logs
- remote_log_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
- remote_log = os.path.join(remote_log_base, log_relative)
- log += '\n*** Reading remote logs...\n'
-
- # S3
- if remote_log.startswith('s3:/'):
- log += log_utils.S3Log().read(remote_log, return_error=True)
-
- # GCS
- elif remote_log.startswith('gs:/'):
- log += log_utils.GCSLog().read(remote_log, return_error=True)
-
- # unsupported
- elif remote_log:
- log += '*** Unsupported remote log location.'
-
- session.commit()
- session.close()
-
- if PY2 and not isinstance(log, unicode):
- log = log.decode('utf-8')
-
- title = "Log"
-
- return self.render(
- 'airflow/ti_code.html',
- code=log, dag=dag, title=title, task_id=task_id,
- execution_date=execution_date, form=form)
-
- @expose('/task')
- @login_required
- @wwwutils.action_logging
- def task(self):
- dag_id = request.args.get('dag_id')
- task_id = request.args.get('task_id')
- # Carrying execution_date through, even though it's irrelevant for
- # this context
- execution_date = request.args.get('execution_date')
- dttm = dateutil.parser.parse(execution_date)
- form = DateTimeForm(data={'execution_date': dttm})
- dag = dagbag.get_dag(dag_id)
- if not dag or task_id not in dag.task_ids:
- flash(
- "Task [{}.{}] doesn't seem to exist"
- " at the moment".format(dag_id, task_id),
- "error")
- return redirect('/admin/')
- task = dag.get_task(task_id)
- task = copy.copy(task)
- task.resolve_template_files()
-
- attributes = []
- for attr_name in dir(task):
- if not attr_name.startswith('_'):
- attr = getattr(task, attr_name)
- if type(attr) != type(self.task) and \
- attr_name not in attr_renderer:
- attributes.append((attr_name, str(attr)))
-
- title = "Task Details"
- # Color coding the special attributes that are code
- special_attrs_rendered = {}
- for attr_name in attr_renderer:
- if hasattr(task, attr_name):
- source = getattr(task, attr_name)
- special_attrs_rendered[attr_name] = attr_renderer[attr_name](source)
-
- return self.render(
- 'airflow/task.html',
- attributes=attributes,
- task_id=task_id,
- execution_date=execution_date,
- special_attrs_rendered=special_attrs_rendered,
- form=form,
- dag=dag, title=title)
-
- @expose('/xcom')
- @login_required
- @wwwutils.action_logging
- def xcom(self):
- dag_id = request.args.get('dag_id')
- task_id = request.args.get('task_id')
- # Carrying execution_date through, even though it's irrelevant for
- # this context
- execution_date = request.args.get('execution_date')
- dttm = dateutil.parser.parse(execution_date)
- form = DateTimeForm(data={'execution_date': dttm})
- dag = dagbag.get_dag(dag_id)
- if not dag or task_id not in dag.task_ids:
- flash(
- "Task [{}.{}] doesn't seem to exist"
- " at the moment".format(dag_id, task_id),
- "error")
- return redirect('/admin/')
-
- session = Session()
- xcomlist = session.query(XCom).filter(
- XCom.dag_id == dag_id, XCom.task_id == task_id,
- XCom.execution_date == dttm).all()
-
- attributes = []
- for xcom in xcomlist:
- if not xcom.key.startswith('_'):
- attributes.append((xcom.key, xcom.value))
-
- title = "XCom"
- return self.render(
- 'airflow/xcom.html',
- attributes=attributes,
- task_id=task_id,
- execution_date=execution_date,
- form=form,
- dag=dag, title=title)\
-
- @expose('/run')
- @login_required
- @wwwutils.action_logging
- @wwwutils.notify_owner
- def run(self):
- dag_id = request.args.get('dag_id')
- task_id = request.args.get('task_id')
- origin = request.args.get('origin')
- dag = dagbag.get_dag(dag_id)
- task = dag.get_task(task_id)
-
- execution_date = request.args.get('execution_date')
- execution_date = dateutil.parser.parse(execution_date)
- force = request.args.get('force') == "true"
- deps = request.args.get('deps') == "true"
-
- try:
- from airflow.executors import DEFAULT_EXECUTOR as executor
- from airflow.executors import CeleryExecutor
- if not isinstance(executor, CeleryExecutor):
- flash("Only works with the CeleryExecutor, sorry", "error")
- return redirect(origin)
- except ImportError:
- # in case CeleryExecutor cannot be imported it is not active either
- flash("Only works with the CeleryExecutor, sorry", "error")
- return redirect(origin)
-
- ti = models.TaskInstance(task=task, execution_date=execution_date)
- executor.start()
- executor.queue_task_instance(
- ti, force=force, ignore_dependencies=deps)
- executor.heartbeat()
- flash(
- "Sent {} to the message queue, "
- "it should start any moment now.".format(ti))
- return redirect(origin)
-
- @expose('/clear')
- @login_required
- @wwwutils.action_logging
- @wwwutils.notify_owner
- def clear(self):
- dag_id = request.args.get('dag_id')
- task_id = request.args.get('task_id')
- origin = request.args.get('origin')
- dag = dagbag.get_dag(dag_id)
- task = dag.get_task(task_id)
-
- execution_date = request.args.get('execution_date')
- execution_date = dateutil.parser.parse(execution_date)
- confirmed = request.args.get('confirmed') == "true"
- upstream = request.args.get('upstream') == "true"
- downstream = request.args.get('downstream') == "true"
- future = request.args.get('future') == "true"
- past = request.args.get('past') == "true"
-
- dag = dag.sub_dag(
- task_regex=r"^{0}$".format(task_id),
- include_downstream=downstream,
- include_upstream=upstream)
-
- end_date = execution_date if not future else None
- start_date = execution_date if not past else None
- if confirmed:
- count = dag.clear(
- start_date=start_date,
- end_date=end_date)
-
- flash("{0} task instances have been cleared".format(count))
- return redirect(origin)
- else:
- tis = dag.clear(
- start_date=start_date,
- end_date=end_date,
- dry_run=True)
- if not tis:
- flash("No task instances to clear", 'error')
- response = redirect(origin)
- else:
- details = "\n".join([str(t) for t in tis])
-
- response = self.render(
- 'airflow/confirm.html',
- message=(
- "Here's the list of task instances you are about "
- "to clear:"),
- details=details,)
-
- return response
-
- @expose('/blocked')
- @login_required
- def blocked(self):
- session = settings.Session()
- DR = models.DagRun
- dags = (
- session.query(DR.dag_id, sqla.func.count(DR.id))
- .filter(DR.state == State.RUNNING)
- .group_by(DR.dag_id)
- .all()
- )
- payload = []
- for dag_id, active_dag_runs in dags:
- max_active_runs = 0
- if dag_id in dagbag.dags:
- max_active_runs = dagbag.dags[dag_id].max_active_runs
- payload.append({
- 'dag_id': dag_id,
- 'active_dag_run': active_dag_runs,
- 'max_active_runs': max_active_runs,
- })
- return wwwutils.json_response(payload)
-
- @expose('/success')
- @login_required
- @wwwutils.action_logging
- @wwwutils.notify_owner
- def success(self):
- dag_id = request.args.get('dag_id')
- task_id = request.args.get('task_id')
- origin = request.args.get('origin')
- dag = dagbag.get_dag(dag_id)
- task = dag.get_task(task_id)
-
- execution_date = request.args.get('execution_date')
- execution_date = dateutil.parser.parse(execution_date)
- confirmed = request.args.get('confirmed') == "true"
- upstream = request.args.get('upstream') == "true"
- downstream = request.args.get('downstream') == "true"
- future = request.args.get('future') == "true"
- past = request.args.get('past') == "true"
- MAX_PERIODS = 1000
-
- # Flagging tasks as successful
- session = settings.Session()
- task_ids = [task_id]
- end_date = ((dag.latest_execution_date or datetime.now())
- if future else execution_date)
-
- if 'start_date' in dag.default_args:
- start_date = dag.default_args['start_date']
- elif dag.start_date:
- start_date = dag.start_date
- else:
- start_date = execution_date
-
- start_date = execution_date if not past else start_date
-
- if downstream:
- task_ids += [
- t.task_id
- for t in task.get_flat_relatives(upstream=False)]
- if upstream:
- task_ids += [
- t.task_id
- for t in task.get_flat_relatives(upstream=True)]
- TI = models.TaskInstance
-
- if dag.schedule_interval == '@once':
- dates = [start_date]
- else:
- dates = dag.date_range(start_date, end_date=end_date)
-
- tis = session.query(TI).filter(
- TI.dag_id == dag_id,
- TI.execution_date.in_(dates),
- TI.task_id.in_(task_ids)).all()
- tis_to_change = session.query(TI).filter(
- TI.dag_id == dag_id,
- TI.execution_date.in_(dates),
- TI.task_id.in_(task_ids),
- TI.state != State.SUCCESS).all()
- tasks = list(product(task_ids, dates))
- tis_to_create = list(
- set(tasks) -
- set([(ti.task_id, ti.execution_date) for ti in tis]))
-
- tis_all_altered = list(chain(
- [(ti.task_id, ti.execution_date) for ti in tis_to_change],
- tis_to_create))
-
- if len(tis_all_altered) > MAX_PERIODS:
- flash("Too many tasks at once (>{0})".format(
- MAX_PERIODS), 'error')
- return redirect(origin)
-
- if confirmed:
- for ti in tis_to_change:
- ti.state = State.SUCCESS
- session.commit()
-
- for task_id, task_execution_date in tis_to_create:
- ti = TI(
- task=dag.get_task(task_id),
- execution_date=task_execution_date,
- state=State.SUCCESS)
- session.add(ti)
- session.commit()
-
- session.commit()
- session.close()
- flash("Marked success on {} task instances".format(
- len(tis_all_altered)))
-
- return redirect(origin)
- else:
- if not tis_all_altered:
- flash("No task instances to mark as successful", 'error')
- response = redirect(origin)
- else:
- tis = []
- for task_id, task_execution_date in tis_all_altered:
- tis.append(TI(
- task=dag.get_task(task_id),
- execution_date=task_execution_date,
- state=State.SUCCESS))
- details = "\n".join([str(t) for t in tis])
-
- response = self.render(
- 'airflow/confirm.html',
- message=(
- "Here's the list of task instances you are about "
- "to mark as successful:"),
- details=details,)
- return response
-
- @expose('/tree')
- @login_required
- @wwwutils.gzipped
- @wwwutils.action_logging
- def tree(self):
- dag_id = request.args.get('dag_id')
- blur = conf.getboolean('webserver', 'demo_mode')
- dag = dagbag.get_dag(dag_id)
- root = request.args.get('root')
- if root:
- dag = dag.sub_dag(
- task_regex=root,
- include_downstream=False,
- include_upstream=True)
-
- session = settings.Session()
-
- base_date = request.args.get('base_date')
- num_runs = request.args.get('num_runs')
- num_runs = int(num_runs) if num_runs else 25
-
- if base_date:
- base_date = dateutil.parser.parse(base_date)
- else:
- base_date = dag.latest_execution_date or datetime.now()
-
- dates = dag.date_range(base_date, num=-abs(num_runs))
- min_date = dates[0] if dates else datetime(2000, 1, 1)
-
- DR = models.DagRun
- dag_runs = (
- session.query(DR)
- .filter(
- DR.dag_id==dag.dag_id,
- DR.execution_date<=base_date,
- DR.execution_date>=min_date)
- .all()
- )
- dag_runs = {
- dr.execution_date: alchemy_to_dict(dr) for dr in dag_runs}
-
- tis = dag.get_task_instances(
- session, start_date=min_date, end_date=base_date)
- dates = sorted(list({ti.execution_date for ti in tis}))
- max_date = max([ti.execution_date for ti in tis]) if dates else None
- task_instances = {}
- for ti in tis:
- tid = alchemy_to_dict(ti)
- dr = dag_runs.get(ti.execution_date)
- tid['external_trigger'] = dr['external_trigger'] if dr else False
- task_instances[(ti.task_id, ti.execution_date)] = tid
-
- expanded = []
- # The default recursion traces every path so that tree view has full
- # expand/collapse functionality. After 5,000 nodes we stop and fall
- # back on a quick DFS search for performance. See PR #320.
- node_count = [0]
- node_limit = 5000 / max(1, len(dag.roots))
-
- def recurse_nodes(task, visited):
- visited.add(task)
- node_count[0] += 1
-
- children = [
- recurse_nodes(t, visited) for t in task.upstream_list
- if node_count[0] < node_limit or t not in visited]
-
- # D3 tree uses children vs _children to define what is
- # expanded or not. The following block makes it such that
- # repeated nodes are collapsed by default.
- children_key = 'children'
- if task.task_id not in expanded:
- expanded.append(task.task_id)
- elif children:
- children_key = "_children"
-
- return {
- 'name': task.task_id,
- 'instances': [
- task_instances.get((task.task_id, d)) or {
- 'execution_date': d.isoformat(),
- 'task_id': task.task_id
- }
- for d in dates],
- children_key: children,
- 'num_dep': len(task.upstream_list),
- 'operator': task.task_type,
- 'retries': task.retries,
- 'owner': task.owner,
- 'start_date': task.start_date,
- 'end_date': task.end_date,
- 'depends_on_past': task.depends_on_past,
- 'ui_color': task.ui_color,
- }
- data = {
- 'name': '[DAG]',
- 'children': [recurse_nodes(t, set()) for t in dag.roots],
- 'instances': [
- dag_runs.get(d) or {'execution_date': d.isoformat()}
- for d in dates],
- }
-
- data = json.dumps(data, indent=4, default=json_ser)
- session.commit()
- session.close()
-
- form = DateTimeWithNumRunsForm(data={'base_date': max_date,
- 'num_runs': num_runs})
- return self.render(
- 'airflow/tree.html',
- operators=sorted(
- list(set([op.__class__ for op in dag.tasks])),
- key=lambda x: x.__name__
- ),
- root=root,
- form=form,
- dag=dag, data=data, blur=blur)
-
- @expose('/graph')
- @login_required
- @wwwutils.gzipped
- @wwwutils.action_logging
- def graph(self):
- session = settings.Session()
- dag_id = request.args.get('dag_id')
- blur = conf.getboolean('webserver', 'demo_mode')
- arrange = request.args.get('arrange', "LR")
- dag = dagbag.get_dag(dag_id)
- if dag_id not in dagbag.dags:
- flash('DAG "{0}" seems to be missing.'.format(dag_id), "error")
- return redirect('/admin/')
-
- root = request.args.get('root')
- if root:
- dag = dag.sub_dag(
- task_regex=root,
- include_upstream=True,
- include_downstream=False)
-
- nodes = []
- edges = []
- for task in dag.tasks:
- nodes.append({
- 'id': task.task_id,
- 'value': {
- 'label': task.task_id,
- 'labelStyle': "fill:{0};".format(task.ui_fgcolor),
- 'style': "fill:{0};".format(task.ui_color),
- }
- })
-
- def get_upstream(task):
- for t in task.upstream_list:
- edge = {
- 'u': t.task_id,
- 'v': task.task_id,
- }
- if edge not in edges:
- edges.append(edge)
- get_upstream(t)
-
- for t in dag.roots:
- get_upstream(t)
-
- dttm = request.args.get('execution_date')
- if dttm:
- dttm = dateutil.parser.parse(dttm)
- else:
- dttm = dag.latest_execution_date or datetime.now().date()
-
- DR = models.DagRun
- drs = (
- session.query(DR)
- .filter_by(dag_id=dag_id)
- .order_by(desc(DR.execution_date)).all()
- )
- dr_choices = []
- dr_state = None
- for dr in drs:
- dr_choices.append((dr.execution_date.isoformat(), dr.run_id))
- if dttm == dr.execution_date:
- dr_state = dr.state
-
- class GraphForm(Form):
- execution_date = SelectField("DAG run", choices=dr_choices)
- arrange = SelectField("Layout", choices=(
- ('LR', "Left->Right"),
- ('RL', "Right->Left"),
- ('TB', "Top->Bottom"),
- ('BT', "Bottom->Top"),
- ))
- form = GraphForm(
- data={'execution_date': dttm.isoformat(), 'arrange': arrange})
-
- task_instances = {
- ti.task_id: alchemy_to_dict(ti)
- for ti in dag.get_task_instances(session, dttm, dttm)}
- tasks = {
- t.task_id: {
- 'dag_id': t.dag_id,
- 'task_type': t.task_type,
- }
- for t in dag.tasks}
- if not tasks:
- flash("No tasks found", "error")
- session.commit()
- session.close()
- doc_md = markdown.markdown(dag.doc_md) if hasattr(dag, 'doc_md') else ''
-
- return self.render(
- 'airflow/graph.html',
- dag=dag,
- form=form,
- width=request.args.get('width', "100%"),
- height=request.args.get('height', "800"),
- execution_date=dttm.isoformat(),
- state_token=state_token(dr_state),
- doc_md=doc_md,
- arrange=arrange,
- operators=sorted(
- list(set([op.__class__ for op in dag.tasks])),
- key=lambda x: x.__name__
- ),
- blur=blur,
- root=root or '',
- task_instances=json.dumps(task_instances, indent=2),
- tasks=json.dumps(tasks, indent=2),
- nodes=json.dumps(nodes, indent=2),
- edges=json.dumps(edges, indent=2),)
-
- @expose('/duration')
- @login_required
- @wwwutils.action_logging
- def duration(self):
- from nvd3 import lineChart
- import time
- session = settings.Session()
- dag_id = request.args.get('dag_id')
- dag = dagbag.get_dag(dag_id)
- base_date = request.args.get('base_date')
- num_runs = request.args.get('num_runs')
- num_runs = int(num_runs) if num_runs else 25
-
- if base_date:
- base_date = dateutil.parser.parse(base_date)
- else:
- base_date = dag.latest_execution_date or datetime.now()
-
- dates = dag.date_range(base_date, num=-abs(num_runs))
- min_date = dates[0] if dates else datetime(2000, 1, 1)
-
- root = request.args.get('root')
- if root:
- dag = dag.sub_dag(
- task_regex=root,
- include_upstream=True,
- include_downstream=False)
-
- chart = lineChart(name="lineChart", x_is_date=True, height=750, width=600)
- for task in dag.tasks:
- y = []
- x = []
- for ti in task.get_task_instances(session, start_date=min_date,
- end_date=base_date):
- if ti.duration:
- dttm = int(time.mktime(ti.execution_date.timetuple())) * 1000
- x.append(dttm)
- y.append(float(ti.duration) / (60*60))
- if x:
- chart.add_serie(name=task.task_id, x=x, y=y)
-
- tis = dag.get_task_instances(
- session, start_date=min_date, end_date=base_date)
- dates = sorted(list({ti.execution_date for ti in tis}))
- max_date = max([ti.execution_date for ti in tis]) if dates else None
-
- session.commit()
- session.close()
-
- form = DateTimeWithNumRunsForm(data={'base_date': max_date,
- 'num_runs': num_runs})
- chart.buildhtml()
- return self.render(
- 'airflow/chart.html',
- dag=dag,
- demo_mode=conf.getboolean('webserver', 'demo_mode'),
- root=root,
- form=form,
- chart=chart,
- )
-
- @expose('/landing_times')
- @login_required
- @wwwutils.action_logging
- def landing_times(self):
- session = settings.Session()
- dag_id = request.args.get('dag_id')
- dag = dagbag.get_dag(dag_id)
- base_date = request.args.get('base_date')
- num_runs = request.args.get('num_runs')
- num_runs = int(num_runs) if num_runs else 25
-
- if base_date:
- base_date = dateutil.parser.parse(base_date)
- else:
- base_date = dag.latest_execution_date or datetime.now()
-
- dates = dag.date_range(base_date, num=-abs(num_runs))
- min_date = dates[0] if dates else datetime(2000, 1, 1)
-
- root = request.args.get('root')
- if root:
- dag = dag.sub_dag(
- task_regex=root,
- include_upstream=True,
- include_downstream=False)
-
- all_data = []
- for task in dag.tasks:
- data = []
- for ti in task.get_task_instances(session, start_date=min_date,
- end_date=base_date):
- if ti.end_date:
- ts = ti.execution_date
- if dag.schedule_interval:
- ts = dag.following_schedule(ts)
- secs = old_div((ti.end_date - ts).total_seconds(), 60*60)
- data.append([ti.execution_date.isoformat(), secs])
- all_data.append({'data': data, 'name': task.task_id})
-
- tis = dag.get_task_instances(
- session, start_date=min_date, end_date=base_date)
- dates = sorted(list({ti.execution_date for ti in tis}))
- max_date = max([ti.execution_date for ti in tis]) if dates else None
-
- session.commit()
- session.close()
-
- form = DateTimeWithNumRunsForm(data={'base_date': max_date,
- 'num_runs': num_runs})
- return self.render(
- 'airflow/chart.html',
- dag=dag,
- data=json.dumps(all_data),
- height="700px",
- chart_options={'yAxis': {'title': {'text': 'hours after 00:00'}}},
- demo_mode=conf.getboolean('webserver', 'demo_mode'),
- root=root,
- form=form,
- )
-
- @expose('/paused')
- @login_required
- @wwwutils.action_logging
- def paused(self):
- DagModel = models.DagModel
- dag_id = request.args.get('dag_id')
- session = settings.Session()
- orm_dag = session.query(
- DagModel).filter(DagModel.dag_id == dag_id).first()
- if request.args.get('is_paused') == 'false':
- orm_dag.is_paused = True
- else:
- orm_dag.is_paused = False
- session.merge(orm_dag)
- session.commit()
- session.close()
-
- dagbag.get_dag(dag_id)
- return "OK"
-
- @expose('/refresh')
- @login_required
- @wwwutils.action_logging
- def refresh(self):
- DagModel = models.DagModel
- dag_id = request.args.get('dag_id')
- session = settings.Session()
- orm_dag = session.query(
- DagModel).filter(DagModel.dag_id == dag_id).first()
-
- if orm_dag:
- orm_dag.last_expired = datetime.now()
- session.merge(orm_dag)
- session.commit()
- session.close()
-
- dagbag.get_dag(dag_id)
- flash("DAG [{}] is now fresh as a daisy".format(dag_id))
- return redirect('/')
-
- @expose('/refresh_all')
- @login_required
- @wwwutils.action_logging
- def refresh_all(self):
- dagbag.collect_dags(only_if_updated=False)
- flash("All DAGs are now up to date")
- return redirect('/')
-
- @expose('/gantt')
- @login_required
- @wwwutils.action_logging
- def gantt(self):
-
- session = settings.Session()
- dag_id = request.args.get('dag_id')
- dag = dagbag.get_dag(dag_id)
- demo_mode = conf.getboolean('webserver', 'demo_mode')
-
- root = request.args.get('root')
- if root:
- dag = dag.sub_dag(
- task_regex=root,
- include_upstream=True,
- include_downstream=False)
-
- dttm = request.args.get('execution_date')
- if dttm:
- dttm = dateutil.parser.parse(dttm)
- else:
- dttm = dag.latest_execution_date or datetime.now().date()
-
- form = DateTimeForm(data={'execution_date': dttm})
-
- tis = [
- ti
- for ti in dag.get_task_instances(session, dttm, dttm)
- if ti.start_date]
- tis = sorted(tis, key=lambda ti: ti.start_date)
- tasks = []
- data = []
- for i, ti in enumerate(tis):
- end_date = ti.end_date or datetime.now()
- tasks += [ti.task_id]
- color = State.color(ti.state)
- data.append({
- 'x': i,
- 'low': int(ti.start_date.strftime('%s')) * 1000,
- 'high': int(end_date.strftime('%s')) * 1000,
- 'color': color,
- })
- height = (len(tis) * 25) + 50
- session.commit()
- session.close()
-
- hc = {
- 'chart': {
- 'type': 'columnrange',
- 'inverted': True,
- 'height': height,
- },
- 'xAxis': {'categories': tasks, 'alternateGridColor': '#FAFAFA'},
- 'yAxis': {'type': 'datetime'},
- 'title': {
- 'text': None
- },
- 'plotOptions': {
- 'series': {
- 'cursor': 'pointer',
- 'minPointLength': 4,
- },
- },
- 'legend': {
- 'enabled': False
- },
- 'series': [{
- 'data': data
- }]
- }
- return self.render(
- 'airflow/gantt.html',
- dag=dag,
- execution_date=dttm.isoformat(),
- form=form,
- hc=json.dumps(hc, indent=4),
- height=height,
- demo_mode=demo_mode,
- root=root,
- )
-
- @expose('/object/task_instances')
- @login_required
- @wwwutils.action_logging
- def task_instances(self):
- session = settings.Session()
- dag_id = request.args.get('dag_id')
- dag = dagbag.get_dag(dag_id)
-
- dttm = request.args.get('execution_date')
- if dttm:
- dttm = dateutil.parser.parse(dttm)
- else:
- return ("Error: Invalid execution_date")
-
- task_instances = {
- ti.task_id: alchemy_to_dict(ti)
- for ti in dag.get_task_instances(session, dttm, dttm)}
-
- return json.dumps(task_instances)
-
- @expose('/variables/<form>', methods=["GET", "POST"])
- @login_required
- @wwwutils.action_logging
- def variables(self, form):
- try:
- if request.method == 'POST':
- data = request.json
- if data:
- session = settings.Session()
- var = models.Variable(key=form, val=json.dumps(data))
- session.add(var)
- session.commit()
- return ""
- else:
- return self.render(
- 'airflow/variables/{}.html'.format(form)
- )
- except:
- return ("Error: form airflow/variables/{}.html "
- "not found.").format(form), 404
-
-
-class HomeView(AdminIndexView):
- @expose("/")
- @login_required
- def index(self):
- session = Session()
- DM = models.DagModel
- qry = None
- # filter the dags if filter_by_owner and current user is not superuser
- do_filter = FILTER_BY_OWNER and (not current_user.is_superuser())
- if do_filter:
- qry = (
- session.query(DM)
- .filter(
- ~DM.is_subdag, DM.is_active,
- DM.owners == current_user.username)
- .all()
- )
- else:
- qry = session.query(DM).filter(~DM.is_subdag, DM.is_active).all()
- orm_dags = {dag.dag_id: dag for dag in qry}
- import_errors = session.query(models.ImportError).all()
- for ie in import_errors:
- flash(
- "Broken DAG: [{ie.filename}] {ie.stacktrace}".format(ie=ie),
- "error")
- session.expunge_all()
- session.commit()
- session.close()
- dags = dagbag.dags.values()
- if do_filter:
- dags = {
- dag.dag_id: dag
- for dag in dags
- if (
- dag.owner == current_user.username and (not dag.parent_dag)
- )
- }
- else:
- dags = {dag.dag_id: dag for dag in dags if not dag.parent_dag}
- all_dag_ids = sorted(set(orm_dags.keys()) | set(dags.keys()))
- return self.render(
- 'airflow/dags.html',
- dags=dags,
- orm_dags=orm_dags,
- all_dag_ids=all_dag_ids)
-
-
-class QueryView(wwwutils.DataProfilingMixin, BaseView):
- @expose('/')
- @wwwutils.gzipped
- def query(self):
- session = settings.Session()
- dbs = session.query(models.Connection).order_by(
- models.Connection.conn_id).all()
- session.expunge_all()
- db_choices = list(
- ((db.conn_id, db.conn_id) for db in dbs if db.get_hook()))
- conn_id_str = request.args.get('conn_id')
- csv = request.args.get('csv') == "true"
- sql = request.args.get('sql')
-
- class QueryForm(Form):
- conn_id = SelectField("Layout", choices=db_choices)
- sql = TextAreaField("SQL", widget=wwwutils.AceEditorWidget())
- data = {
- 'conn_id': conn_id_str,
- 'sql': sql,
- }
- results = None
- has_data = False
- error = False
- if conn_id_str:
- db = [db for db in dbs if db.conn_id == conn_id_str][0]
- hook = db.get_hook()
- try:
- df = hook.get_pandas_df(wwwutils.limit_sql(sql, QUERY_LIMIT, conn_type=db.conn_type))
- # df = hook.get_pandas_df(sql)
- has_data = len(df) > 0
- df = df.fillna('')
- results = df.to_html(
- classes=[
- 'table', 'table-bordered', 'table-striped', 'no-wrap'],
- index=False,
- na_rep='',
- ) if has_data else ''
- except Exception as e:
- flash(str(e), 'error')
- error = True
-
- if has_data and len(df) == QUERY_LIMIT:
- flash(
- "Query output truncated at " + str(QUERY_LIMIT) +
- " rows", 'info')
-
- if not has_data and error:
- flash('No data', 'error')
-
- if csv:
- return Response(
- response=df.to_csv(index=False),
- status=200,
- mimetype="application/text")
-
- form = QueryForm(request.form, data=data)
- session.commit()
- session.close()
- return self.render(
- 'airflow/query.html', form=form,
- title="Ad Hoc Query",
- results=results or '',
- has_data=has_data)
-
-
-class AirflowModelView(ModelView):
- list_template = 'airflow/model_list.html'
- edit_template = 'airflow/model_edit.html'
- create_template = 'airflow/model_create.html'
- column_display_actions = True
- page_size = 500
-
-
-class ModelViewOnly(wwwutils.LoginMixin, AirflowModelView):
- """
- Modifying the base ModelView class for non edit, browse only operations
- """
- named_filter_urls = True
- can_create = False
- can_edit = False
- can_delete = False
- column_display_pk = True
-
-
-class PoolModelView(wwwutils.SuperUserMixin, AirflowModelView):
- column_list = ('pool', 'slots', 'used_slots', 'queued_slots')
- column_formatters = dict(
- pool=pool_link, used_slots=fused_slots, queued_slots=fqueued_slots)
- named_filter_urls = True
-
-
-class SlaMissModelView(wwwutils.SuperUserMixin, ModelViewOnly):
- verbose_name_plural = "SLA misses"
- verbose_name = "SLA miss"
- column_list = (
- 'dag_id', 'task_id', 'execution_date', 'email_sent', 'timestamp')
- column_formatters = dict(
- task_id=task_instance_link,
- execution_date=datetime_f,
- timestamp=datetime_f,
- dag_id=dag_link)
- named_filter_urls = True
- column_searchable_list = ('dag_id', 'task_id',)
- column_filters = (
- 'dag_id', 'task_id', 'email_sent', 'timestamp', 'execution_date')
- form_widget_args = {
- 'email_sent': {'disabled': True},
- 'timestamp': {'disabled': True},
- }
-
-class ChartModelView(wwwutils.DataProfilingMixin, AirflowModelView):
- verbose_name = "chart"
- verbose_name_plural = "charts"
- form_columns = (
- 'label',
- 'owner',
- 'conn_id',
- 'chart_type',
- 'show_datatable',
- 'x_is_date',
- 'y_log_scale',
- 'show_sql',
- 'height',
- 'sql_layout',
- 'sql',
- 'default_params',)
- column_list = (
- 'label', 'conn_id', 'chart_type', 'owner', 'last_modified',)
- column_formatters = dict(label=label_link, last_modified=datetime_f)
- column_default_sort = ('last_modified', True)
- create_template = 'airflow/chart/create.html'
- edit_template = 'airflow/chart/edit.html'
- column_filters = ('label', 'owner.username', 'conn_id')
- column_searchable_list = ('owner.username', 'label', 'sql')
- column_descriptions = {
- 'label': "Can include {{ templated_fields }} and {{ macros }}",
- 'chart_type': "The type of chart to be displayed",
- 'sql': "Can include {{ templated_fields }} and {{ macros }}.",
- 'height': "Height of the chart, in pixels.",
- 'conn_id': "Source database to run the query against",
- 'x_is_date': (
- "Whether the X axis should be casted as a date field. Expect most "
- "intelligible date formats to get casted properly."
- ),
- 'owner': (
- "The chart's owner, mostly used for reference and filtering in "
- "the list view."
- ),
- 'show_datatable':
- "Whether to display an interactive data table under the chart.",
- 'default_params': (
- 'A dictionary of {"key": "values",} that define what the '
- 'templated fields (parameters) values should be by default. '
- 'To be valid, it needs to "eval" as a Python dict. '
- 'The key values will show up in the url\'s querystring '
- 'and can be altered there.'
- ),
- 'show_sql': "Whether to display the SQL statement as a collapsible "
- "section in the chart page.",
- 'y_log_scale': "Whether to use a log scale for the Y axis.",
- 'sql_layout': (
- "Defines the layout of the SQL that the application should "
- "expect. Depending on the tables you are sourcing from, it may "
- "make more sense to pivot / unpivot the metrics."
- ),
- }
- column_labels = {
- 'sql': "SQL",
- 'height': "Chart Height",
- 'sql_layout': "SQL Layout",
- 'show_sql': "Display the SQL Statement",
- 'default_params': "Default Parameters",
- }
- form_choices = {
- 'chart_type': [
- ('line', 'Line Chart'),
- ('spline', 'Spline Chart'),
- ('bar', 'Bar Chart'),
- ('para', 'Parallel Coordinates'),
- ('column', 'Column Chart'),
- ('area', 'Overlapping Area Chart'),
- ('stacked_area', 'Stacked Area Chart'),
- ('percent_area', 'Percent Area Chart'),
- ('heatmap', 'Heatmap'),
- ('datatable', 'No chart, data table only'),
- ],
- 'sql_layout': [
- ('series', 'SELECT series, x, y FROM ...'),
- ('columns', 'SELECT x, y (series 1), y (series 2), ... FROM ...'),
- ],
- 'conn_id': [
- (c.conn_id, c.conn_id)
- for c in (
- Session().query(models.Connection.conn_id)
- .group_by(models.Connection.conn_id)
- )
- ]
- }
-
- def on_model_change(self, form, model, is_created=True):
- if model.iteration_no is None:
- model.iteration_no = 0
- else:
- model.iteration_no += 1
- if not model.user_id and current_user and hasattr(current_user, 'id'):
- model.user_id = current_user.id
- model.last_modified = datetime.now()
-
-
-class KnowEventView(wwwutils.DataProfilingMixin, AirflowModelView):
- verbose_name = "known event"
- verbose_name_plural = "known events"
- form_columns = (
- 'label',
- 'event_type',
- 'start_date',
- 'end_date',
- 'reported_by',
- 'description')
- column_list = (
- 'label', 'event_type', 'start_date', 'end_date', 'reported_by')
- column_default_sort = ("start_date", True)
-
-
-class KnowEventTypeView(wwwutils.DataProfilingMixin, AirflowModelView):
- pass
-
-'''
-# For debugging / troubleshooting
-mv = KnowEventTypeView(
- models.KnownEventType,
- Session, name="Known Event Types", category="Manage")
-admin.add_view(mv)
-class DagPickleView(SuperUserMixin, ModelView):
- pass
-mv = DagPickleView(
- models.DagPickle,
- Session, name="Pickles", category="Manage")
-admin.add_view(mv)
-'''
-
-
-class VariableView(wwwutils.LoginMixin, AirflowModelView):
- verbose_name = "Variable"
- verbose_name_plural = "Variables"
- form_columns = (
- 'key',
- 'val',
- )
- column_list = ('key', 'is_encrypted',)
- column_filters = ('key', 'val')
- column_searchable_list = ('key', 'val')
- form_widget_args = {
- 'is_encrypted': {'disabled': True},
- 'val': {
- 'rows': 20,
- }
- }
-
-
-class JobModelView(ModelViewOnly):
- verbose_name_plural = "jobs"
- verbose_name = "job"
- column_default_sort = ('start_date', True)
- column_filters = (
- 'job_type', 'dag_id', 'state',
- 'unixname', 'hostname', 'start_date', 'end_date', 'latest_heartbeat')
- column_formatters = dict(
- start_date=datetime_f,
- end_date=datetime_f,
- hostname=nobr_f,
- state=state_f,
- latest_heartbeat=datetime_f)
-
-
-class DagRunModelView(ModelViewOnly):
- verbose_name_plural = "DAG Runs"
- can_delete = True
- can_edit = True
- can_create = True
- column_editable_list = ('state',)
- verbose_name = "dag run"
- column_default_sort = ('execution_date', True)
- form_choices = {
- 'state': [
- ('success', 'success'),
- ('running', 'running'),
- ('failed', 'failed'),
- ],
- }
- column_list = (
- 'state', 'dag_id', 'execution_date', 'run_id', 'external_trigger')
- column_filters = column_list
- column_searchable_list = ('dag_id', 'state', 'run_id')
- column_formatters = dict(
- execution_date=datetime_f,
- state=state_f,
- start_date=datetime_f,
- dag_id=dag_link)
-
- @action('set_running', "Set state to 'running'", None)
- def action_set_running(self, ids):
- self.set_dagrun_state(ids, State.RUNNING)
-
- @action('set_failed', "Set state to 'failed'", None)
- def action_set_failed(self, ids):
- self.set_dagrun_state(ids, State.FAILED)
-
- @action('set_success', "Set state to 'success'", None)
- def action_set_success(self, ids):
- self.set_dagrun_state(ids, State.SUCCESS)
-
- @provide_session
- def set_dagrun_state(self, ids, target_state, session=None):
- try:
- DR = models.DagRun
- count = 0
- for dr in session.query(DR).filter(DR.id.in_(ids)).all():
- count += 1
- dr.state = target_state
- if target_state == State.RUNNING:
- dr.start_date = datetime.now()
- else:
- dr.end_date = datetime.now()
- session.commit()
- flash(
- "{count} dag runs were set to '{target_state}'".format(**locals()))
- except Exception as ex:
- if not self.handle_view_exception(ex):
- raise Exception("Ooops")
- flash('Failed to set state', 'error')
-
-
-class LogModelView(ModelViewOnly):
- verbose_name_plural = "logs"
- verbose_name = "log"
- column_default_sort = ('dttm', True)
- column_filters = ('dag_id', 'task_id', 'execution_date')
- column_formatters = dict(
- dttm=datetime_f, execution_date=datetime_f, dag_id=dag_link)
-
-
-class TaskInstanceModelView(ModelViewOnly):
- verbose_name_plural = "task instances"
- verbose_name = "task instance"
- column_filters = (
- 'state', 'dag_id', 'task_id', 'execution_date', 'hostname',
- 'queue', 'pool', 'operator', 'start_date', 'end_date')
- named_filter_urls = True
- column_formatters = dict(
- log=log_link, task_id=task_instance_link,
- hostname=nobr_f,
- state=state_f,
- execution_date=datetime_f,
- start_date=datetime_f,
- end_date=datetime_f,
- queued_dttm=datetime_f,
- dag_id=dag_link, duration=duration_f)
- column_searchable_list = ('dag_id', 'task_id', 'state')
- column_default_sort = ('start_date', True)
- form_choices = {
- 'state': [
- ('success', 'success'),
- ('running', 'running'),
- ('failed', 'failed'),
- ],
- }
- column_list = (
- 'state', 'dag_id', 'task_id', 'execution_date', 'operator',
- 'start_date', 'end_date', 'duration', 'job_id', 'hostname',
- 'unixname', 'priority_weight', 'queue', 'queued_dttm', 'try_number',
- 'pool', 'log')
- can_delete = True
- page_size = 500
-
- @action('set_running', "Set state to 'running'", None)
- def action_set_running(self, ids):
- self.set_task_instance_state(ids, State.RUNNING)
-
- @action('set_failed', "Set state to 'failed'", None)
- def action_set_failed(self, ids):
- self.set_task_instance_state(ids, State.FAILED)
-
- @action('set_success', "Set state to 'success'", None)
- def action_set_success(self, ids):
- self.set_task_instance_state(ids, State.SUCCESS)
-
- @action('set_retry', "Set state to 'up_for_retry'", None)
- def action_set_retry(self, ids):
- self.set_task_instance_state(ids, State.UP_FOR_RETRY)
-
- @provide_session
- def set_task_instance_state(self, ids, target_state, session=None):
- try:
- TI = models.TaskInstance
- for count, id in enumerate(ids):
- task_id, dag_id, execution_date = id.split(',')
- execution_date = datetime.strptime(execution_date, '%Y-%m-%d %H:%M:%S')
- ti = session.query(TI).filter(TI.task_id == task_id,
- TI.dag_id == dag_id,
- TI.execution_date == execution_date).one()
- ti.state = target_state
- count += 1
- session.commit()
- flash(
- "{count} task instances were set to '{target_state}'".format(**locals()))
- except Exception as ex:
- if not self.handle_view_exception(ex):
- raise Exception("Ooops")
- flash('Failed to set state', 'error')
-
-
-class ConnectionModelView(wwwutils.SuperUserMixin, AirflowModelView):
- create_template = 'airflow/conn_create.html'
- edit_template = 'airflow/conn_edit.html'
- list_template = 'airflow/conn_list.html'
- form_columns = (
- 'conn_id',
- 'conn_type',
- 'host',
- 'schema',
- 'login',
- 'password',
- 'port',
- 'extra',
- 'extra__jdbc__drv_path',
- 'extra__jdbc__drv_clsname',
- 'extra__google_cloud_platform__project',
- 'extra__google_cloud_platform__key_path',
- 'extra__google_cloud_platform__service_account',
- 'extra__google_cloud_platform__scope',
- )
- verbose_name = "Connection"
- verbose_name_plural = "Connections"
- column_default_sort = ('conn_id', False)
- column_list = ('conn_id', 'conn_type', 'host', 'port', 'is_encrypted', 'is_extra_encrypted',)
- form_overrides = dict(_password=PasswordField)
- form_widget_args = {
- 'is_extra_encrypted': {'disabled': True},
- 'is_encrypted': {'disabled': True},
- }
- # Used to customized the form, the forms elements get rendered
- # and results are stored in the extra field as json. All of these
- # need to be prefixed with extra__ and then the conn_type ___ as in
- # extra__{conn_type}__name. You can also hide form elements and rename
- # others from the connection_form.js file
- form_extra_fields = {
- 'extra__jdbc__drv_path' : StringField('Driver Path'),
- 'extra__jdbc__drv_clsname': StringField('Driver Class'),
- 'extra__google_cloud_platform__project': StringField('Project'),
- 'extra__google_cloud_platform__key_path': StringField('Keyfile Path'),
- 'extra__google_cloud_platform__service_account': StringField('Service Account'),
- 'extra__google_cloud_platform__scope': StringField('Scopes (comma seperated)'),
-
- }
- form_choices = {
- 'conn_type': [
- ('bigquery', 'BigQuery',),
- ('datastore', 'Google Datastore'),
- ('ftp', 'FTP',),
- ('google_cloud_storage', 'Google Cloud Storage'),
- ('google_cloud_platform', 'Google Cloud Platform'),
- ('hdfs', 'HDFS',),
- ('http', 'HTTP',),
- ('hive_cli', 'Hive Client Wrapper',),
- ('hive_metastore', 'Hive Metastore Thrift',),
- ('hiveserver2', 'Hive Server 2 Thrift',),
- ('jdbc', 'Jdbc Connection',),
- ('mysql', 'MySQL',),
- ('postgres', 'Postgres',),
- ('oracle', 'Oracle',),
- ('vertica', 'Vertica',),
- ('presto', 'Presto',),
- ('s3', 'S3',),
- ('samba', 'Samba',),
- ('sqlite', 'Sqlite',),
- ('ssh', 'SSH',),
- ('cloudant', 'IBM Cloudant',),
- ('mssql', 'Microsoft SQL Server'),
- ('mesos_framework-id', 'Mesos Framework ID'),
- ]
- }
-
- def on_model_change(self, form, model, is_created):
- formdata = form.data
- if formdata['conn_type'] in ['jdbc', 'google_cloud_platform']:
- extra = {
- key:formdata[key]
- for key in self.form_extra_fields.keys() if key in formdata}
- model.extra = json.dumps(extra)
-
- @classmethod
- def alert_fernet_key(cls):
- return conf.get('core', 'fernet_key') is None
-
- @classmethod
- def is_secure(self):
- """
- Used to display a message in the Connection list view making it clear
- that the passwords and `extra` field can't be encrypted.
- """
- is_secure = False
- try:
- import cryptography
- conf.get('core', 'fernet_key')
- is_secure = True
- except:
- pass
- return is_secure
-
- def on_form_prefill(self, form, id):
- try:
- d = json.loads(form.data.get('extra', '{}'))
- except Exception as e:
- d = {}
-
- for field in list(self.form_extra_fields.keys()):
- value = d.get(field, '')
- if value:
- field = getattr(form, field)
- field.data = value
-
-
-class UserModelView(wwwutils.SuperUserMixin, AirflowModelView):
- verbose_name = "User"
- verbose_name_plural = "Users"
- column_default_sort = 'username'
-
-
-class ConfigurationView(wwwutils.SuperUserMixin, BaseView):
- @expose('/')
- def conf(self):
- raw = request.args.get('raw') == "true"
- title = "Airflow Configuration"
- subtitle = conf.AIRFLOW_CONFIG
- if conf.getboolean("webserver", "expose_config"):
- with open(conf.AIRFLOW_CONFIG, 'r') as f:
- config = f.read()
- else:
- config = (
- "# You Airflow administrator chose not to expose the "
- "configuration, most likely for security reasons.")
- if raw:
- return Response(
- response=config,
- status=200,
- mimetype="application/text")
- else:
- code_html = Markup(highlight(
- config,
- lexers.IniLexer(), # Lexer call
- HtmlFormatter(noclasses=True))
- )
- return self.render(
- 'airflow/code.html',
- pre_subtitle=settings.HEADER + " v" + airflow.__version__,
- code_html=code_html, title=title, subtitle=subtitle)
-
-
-class DagModelView(wwwutils.SuperUserMixin, ModelView):
- column_list = ('dag_id', 'owners')
- column_editable_list = ('is_paused',)
- form_excluded_columns = ('is_subdag', 'is_active')
- column_searchable_list = ('dag_id',)
- column_filters = (
- 'dag_id', 'owners', 'is_paused', 'is_active', 'is_subdag',
- 'last_scheduler_run', 'last_expired')
- form_widget_args = {
- 'last_scheduler_run': {'disabled': True},
- 'fileloc': {'disabled': True},
- 'is_paused': {'disabled': True},
- 'last_pickled': {'disabled': True},
- 'pickle_id': {'disabled': True},
- 'last_loaded': {'disabled': True},
- 'last_expired': {'disabled': True},
- 'pickle_size': {'disabled': True},
- 'scheduler_lock': {'disabled': True},
- 'owners': {'disabled': True},
- }
- column_formatters = dict(
- dag_id=dag_link,
- )
- can_delete = False
- can_create = False
- page_size = 50
- list_template = 'airflow/list_dags.html'
- named_filter_urls = True
-
- def get_query(self):
- """
- Default filters for model
- """
- return (
- super(DagModelView, self)
- .get_query()
- .filter(or_(models.DagModel.is_active, models.DagModel.is_paused))
- .filter(~models.DagModel.is_subdag)
- )
-
- def get_count_query(self):
- """
- Default filters for model
- """
- return (
- super(DagModelView, self)
- .get_count_query()
- .filter(models.DagModel.is_active)
- .filter(~models.DagModel.is_subdag)
- )