You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/04/07 17:30:56 UTC

incubator-airflow git commit: [AIRFLOW-1081] Improve performance of duration chart

Repository: incubator-airflow
Updated Branches:
  refs/heads/master f516c9ee5 -> 0da5125ed


[AIRFLOW-1081] Improve performance of duration chart

This commit reduces the number of queries to
improve perf.

Closes #2226 from saguziel/aguziel-duration-chart-
fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0da5125e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0da5125e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0da5125e

Branch: refs/heads/master
Commit: 0da5125edb03ea867add8c46de6705a5f4b542de
Parents: f516c9e
Author: Alex Guziel <al...@airbnb.com>
Authored: Fri Apr 7 19:30:49 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Fri Apr 7 19:30:49 2017 +0200

----------------------------------------------------------------------
 airflow/www/views.py | 54 +++++++++++++++++++++++++++++------------------
 1 file changed, 33 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0da5125e/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 0194e58..604ae66 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -25,6 +25,7 @@ import dateutil.parser
 import copy
 import json
 import bleach
+from collections import defaultdict
 
 import inspect
 from textwrap import dedent
@@ -1412,25 +1413,38 @@ class Airflow(BaseView):
         cum_chart = nvd3.lineChart(
             name="cumLineChart", x_is_date=True, height=600, width="1200")
 
-        y = {}
-        x = {}
-        cum_y = {}
-        for task in dag.tasks:
-            y[task.task_id] = []
-            x[task.task_id] = []
-            cum_y[task.task_id] = []
-            for ti in task.get_task_instances(session, start_date=min_date,
-                                              end_date=base_date):
-                if ti.duration:
-                    dttm = wwwutils.epoch(ti.execution_date)
-                    x[ti.task_id].append(dttm)
-                    y[ti.task_id].append(float(ti.duration))
-                    fails = session.query(models.TaskFail).filter_by(
-                        task_id=ti.task_id,
-                        dag_id=ti.dag_id,
-                        execution_date=ti.execution_date).all()
-                    fails_total = sum([f.duration for f in fails])
-                    cum_y[ti.task_id].append(float(ti.duration + fails_total))
+        y = defaultdict(list)
+        x = defaultdict(list)
+        cum_y = defaultdict(list)
+
+        tis = dag.get_task_instances(
+            session, start_date=min_date, end_date=base_date)
+        TF = models.TaskFail
+        ti_fails = (
+            session
+                .query(TF)
+                .filter(
+                    TF.dag_id == dag.dag_id,
+                    TF.execution_date >= min_date,
+                    TF.execution_date <= base_date,
+                    TF.task_id.in_([t.task_id for t in dag.tasks]))
+                .all()
+        )
+
+        fails_totals = defaultdict(int)
+        for tf in ti_fails:
+            dict_key = (tf.dag_id, tf.task_id, tf.execution_date)
+            fails_totals[dict_key] += tf.duration
+
+        for ti in tis:
+            if ti.duration:
+                dttm = wwwutils.epoch(ti.execution_date)
+                x[ti.task_id].append(dttm)
+                y[ti.task_id].append(float(ti.duration))
+                fails_dict_key = (ti.dag_id, ti.task_id, ti.execution_date)
+                fails_total = fails_totals[fails_dict_key]
+                cum_y[ti.task_id].append(float(ti.duration + fails_total))
+
         # determine the most relevant time unit for the set of task instance
         # durations for the DAG
         y_unit = infer_time_unit([d for t in y.values() for d in t])
@@ -1448,8 +1462,6 @@ class Airflow(BaseView):
                                     y=scale_time_units(cum_y[task.task_id],
                                     cum_y_unit))
 
-        tis = dag.get_task_instances(
-            session, start_date=min_date, end_date=base_date)
         dates = sorted(list({ti.execution_date for ti in tis}))
         max_date = max([ti.execution_date for ti in tis]) if dates else None