You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2016/06/29 20:19:34 UTC

incubator-airflow git commit: [AIRFLOW-246] Improve dag_stats endpoint query

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 0923356a5 -> a2ed55f2c


[AIRFLOW-246] Improve dag_stats endpoint query

For now, accessing /dag_stats can take a relatively long time
(e.g. over 20 seconds with less than a million rows on some environment).
This patch replaces multiple LEFT OUTER JOINs with INNER JOINs and UNION ALL
and improves that process by making it 3-5x faster.

Closes #1610 from sekikn/AIRFLOW-246


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a2ed55f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a2ed55f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a2ed55f2

Branch: refs/heads/master
Commit: a2ed55f2c40857efaaf1876de5bc7929cbfb3166
Parents: 0923356
Author: Kengo Seki <se...@apache.org>
Authored: Wed Jun 29 22:19:26 2016 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Jun 29 22:19:26 2016 +0200

----------------------------------------------------------------------
 airflow/www/views.py | 34 +++++++++++++++++++---------------
 1 file changed, 19 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a2ed55f2/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 5c18e81..1d0fdd7 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -31,7 +31,7 @@ import inspect
 import traceback
 
 import sqlalchemy as sqla
-from sqlalchemy import or_, desc, and_
+from sqlalchemy import or_, desc, and_, union_all
 
 from flask import (
     redirect, url_for, request, Markup, Response, current_app, render_template)
@@ -497,23 +497,27 @@ class Airflow(BaseView):
 
         # Select all task_instances from active dag_runs.
         # If no dag_run is active, return task instances from most recent dag_run.
-        qry = (
-            session.query(TI.dag_id, TI.state, sqla.func.count(TI.task_id))
-            .outerjoin(RunningDagRun, and_(
-                RunningDagRun.c.dag_id == TI.dag_id,
-                RunningDagRun.c.execution_date == TI.execution_date)
-            )
-            .outerjoin(LastDagRun, and_(
+        LastTI = (
+            session.query(TI.dag_id.label('dag_id'), TI.state.label('state'))
+            .join(LastDagRun, and_(
                 LastDagRun.c.dag_id == TI.dag_id,
-                LastDagRun.c.execution_date == TI.execution_date)
-            )
+                LastDagRun.c.execution_date == TI.execution_date))
             .filter(TI.task_id.in_(task_ids))
             .filter(TI.dag_id.in_(dag_ids))
-            .filter(or_(
-                RunningDagRun.c.dag_id != None,
-                LastDagRun.c.dag_id != None
-            ))
-            .group_by(TI.dag_id, TI.state)
+        )
+        RunningTI = (
+            session.query(TI.dag_id.label('dag_id'), TI.state.label('state'))
+            .join(RunningDagRun, and_(
+                RunningDagRun.c.dag_id == TI.dag_id,
+                RunningDagRun.c.execution_date == TI.execution_date))
+            .filter(TI.task_id.in_(task_ids))
+            .filter(TI.dag_id.in_(dag_ids))
+        )
+
+        UnionTI = union_all(LastTI, RunningTI).alias('union_ti')
+        qry = (
+            session.query(UnionTI.c.dag_id, UnionTI.c.state, sqla.func.count())
+            .group_by(UnionTI.c.dag_id, UnionTI.c.state)
         )
 
         data = {}