You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2016/06/29 20:19:34 UTC
incubator-airflow git commit: [AIRFLOW-246] Improve dag_stats
endpoint query
Repository: incubator-airflow
Updated Branches:
refs/heads/master 0923356a5 -> a2ed55f2c
[AIRFLOW-246] Improve dag_stats endpoint query
For now, accessing /dag_stats can take a relatively long time
(e.g. over 20 seconds with less than a million rows on some environment).
This patch replaces multiple LEFT OUTER JOINs with INNER JOINs and UNION ALL
and improves that process by making it 3-5x faster.
Closes #1610 from sekikn/AIRFLOW-246
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a2ed55f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a2ed55f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a2ed55f2
Branch: refs/heads/master
Commit: a2ed55f2c40857efaaf1876de5bc7929cbfb3166
Parents: 0923356
Author: Kengo Seki <se...@apache.org>
Authored: Wed Jun 29 22:19:26 2016 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Jun 29 22:19:26 2016 +0200
----------------------------------------------------------------------
airflow/www/views.py | 34 +++++++++++++++++++---------------
1 file changed, 19 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a2ed55f2/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 5c18e81..1d0fdd7 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -31,7 +31,7 @@ import inspect
import traceback
import sqlalchemy as sqla
-from sqlalchemy import or_, desc, and_
+from sqlalchemy import or_, desc, and_, union_all
from flask import (
redirect, url_for, request, Markup, Response, current_app, render_template)
@@ -497,23 +497,27 @@ class Airflow(BaseView):
# Select all task_instances from active dag_runs.
# If no dag_run is active, return task instances from most recent dag_run.
- qry = (
- session.query(TI.dag_id, TI.state, sqla.func.count(TI.task_id))
- .outerjoin(RunningDagRun, and_(
- RunningDagRun.c.dag_id == TI.dag_id,
- RunningDagRun.c.execution_date == TI.execution_date)
- )
- .outerjoin(LastDagRun, and_(
+ LastTI = (
+ session.query(TI.dag_id.label('dag_id'), TI.state.label('state'))
+ .join(LastDagRun, and_(
LastDagRun.c.dag_id == TI.dag_id,
- LastDagRun.c.execution_date == TI.execution_date)
- )
+ LastDagRun.c.execution_date == TI.execution_date))
.filter(TI.task_id.in_(task_ids))
.filter(TI.dag_id.in_(dag_ids))
- .filter(or_(
- RunningDagRun.c.dag_id != None,
- LastDagRun.c.dag_id != None
- ))
- .group_by(TI.dag_id, TI.state)
+ )
+ RunningTI = (
+ session.query(TI.dag_id.label('dag_id'), TI.state.label('state'))
+ .join(RunningDagRun, and_(
+ RunningDagRun.c.dag_id == TI.dag_id,
+ RunningDagRun.c.execution_date == TI.execution_date))
+ .filter(TI.task_id.in_(task_ids))
+ .filter(TI.dag_id.in_(dag_ids))
+ )
+
+ UnionTI = union_all(LastTI, RunningTI).alias('union_ti')
+ qry = (
+ session.query(UnionTI.c.dag_id, UnionTI.c.state, sqla.func.count())
+ .group_by(UnionTI.c.dag_id, UnionTI.c.state)
)
data = {}