You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/04/07 17:30:56 UTC
incubator-airflow git commit: [AIRFLOW-1081] Improve performance of
duration chart
Repository: incubator-airflow
Updated Branches:
refs/heads/master f516c9ee5 -> 0da5125ed
[AIRFLOW-1081] Improve performance of duration chart
This commit reduces the number of queries to
improve perf.
Closes #2226 from saguziel/aguziel-duration-chart-
fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0da5125e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0da5125e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0da5125e
Branch: refs/heads/master
Commit: 0da5125edb03ea867add8c46de6705a5f4b542de
Parents: f516c9e
Author: Alex Guziel <al...@airbnb.com>
Authored: Fri Apr 7 19:30:49 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Fri Apr 7 19:30:49 2017 +0200
----------------------------------------------------------------------
airflow/www/views.py | 54 +++++++++++++++++++++++++++++------------------
1 file changed, 33 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0da5125e/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 0194e58..604ae66 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -25,6 +25,7 @@ import dateutil.parser
import copy
import json
import bleach
+from collections import defaultdict
import inspect
from textwrap import dedent
@@ -1412,25 +1413,38 @@ class Airflow(BaseView):
cum_chart = nvd3.lineChart(
name="cumLineChart", x_is_date=True, height=600, width="1200")
- y = {}
- x = {}
- cum_y = {}
- for task in dag.tasks:
- y[task.task_id] = []
- x[task.task_id] = []
- cum_y[task.task_id] = []
- for ti in task.get_task_instances(session, start_date=min_date,
- end_date=base_date):
- if ti.duration:
- dttm = wwwutils.epoch(ti.execution_date)
- x[ti.task_id].append(dttm)
- y[ti.task_id].append(float(ti.duration))
- fails = session.query(models.TaskFail).filter_by(
- task_id=ti.task_id,
- dag_id=ti.dag_id,
- execution_date=ti.execution_date).all()
- fails_total = sum([f.duration for f in fails])
- cum_y[ti.task_id].append(float(ti.duration + fails_total))
+ y = defaultdict(list)
+ x = defaultdict(list)
+ cum_y = defaultdict(list)
+
+ tis = dag.get_task_instances(
+ session, start_date=min_date, end_date=base_date)
+ TF = models.TaskFail
+ ti_fails = (
+ session
+ .query(TF)
+ .filter(
+ TF.dag_id == dag.dag_id,
+ TF.execution_date >= min_date,
+ TF.execution_date <= base_date,
+ TF.task_id.in_([t.task_id for t in dag.tasks]))
+ .all()
+ )
+
+ fails_totals = defaultdict(int)
+ for tf in ti_fails:
+ dict_key = (tf.dag_id, tf.task_id, tf.execution_date)
+ fails_totals[dict_key] += tf.duration
+
+ for ti in tis:
+ if ti.duration:
+ dttm = wwwutils.epoch(ti.execution_date)
+ x[ti.task_id].append(dttm)
+ y[ti.task_id].append(float(ti.duration))
+ fails_dict_key = (ti.dag_id, ti.task_id, ti.execution_date)
+ fails_total = fails_totals[fails_dict_key]
+ cum_y[ti.task_id].append(float(ti.duration + fails_total))
+
# determine the most relevant time unit for the set of task instance
# durations for the DAG
y_unit = infer_time_unit([d for t in y.values() for d in t])
@@ -1448,8 +1462,6 @@ class Airflow(BaseView):
y=scale_time_units(cum_y[task.task_id],
cum_y_unit))
- tis = dag.get_task_instances(
- session, start_date=min_date, end_date=base_date)
dates = sorted(list({ti.execution_date for ti in tis}))
max_date = max([ti.execution_date for ti in tis]) if dates else None