You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/03/13 04:45:28 UTC

[30/45] incubator-airflow git commit: [AIRFLOW-937] Improve performance of task_stats

[AIRFLOW-937] Improve performance of task_stats

Please accept this PR that addresses the following
issues:
-
https://issues.apache.org/jira/browse/AIRFLOW-937

Testing Done:
- Shouldn't change functionality significantly,
should pass existing tests (if they exist)

This leads to slightly different results, but it
reduced the time of this endpoint from 90s to 9s
on our data, and the existing logic for task_ids
was already incorrect (task_ids may not be
distinct across dags)

Closes #2121 from saguziel/task-stats-fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/66f39ca0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/66f39ca0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/66f39ca0

Branch: refs/heads/v1-8-stable
Commit: 66f39ca0c3511da2ff86858ce7ea569d11adbd44
Parents: 0964f18
Author: Alex Guziel <al...@airbnb.com>
Authored: Thu Mar 2 14:04:49 2017 -0800
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 08:21:13 2017 -0700

----------------------------------------------------------------------
 airflow/www/views.py | 16 +++++-----------
 1 file changed, 5 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/66f39ca0/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index d8acfef..d1a1f9a 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -497,26 +497,24 @@ class Airflow(BaseView):
 
     @expose('/task_stats')
     def task_stats(self):
-        task_ids = []
-        dag_ids = []
-        for dag in dagbag.dags.values():
-            task_ids += dag.task_ids
-            if not dag.is_subdag:
-                dag_ids.append(dag.dag_id)
-
         TI = models.TaskInstance
         DagRun = models.DagRun
+        Dag = models.DagModel
         session = Session()
 
         LastDagRun = (
             session.query(DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date'))
+            .join(Dag, Dag.dag_id == DagRun.dag_id)
             .filter(DagRun.state != State.RUNNING)
+            .filter(Dag.is_active == 1)
             .group_by(DagRun.dag_id)
             .subquery('last_dag_run')
         )
         RunningDagRun = (
             session.query(DagRun.dag_id, DagRun.execution_date)
+            .join(Dag, Dag.dag_id == DagRun.dag_id)
             .filter(DagRun.state == State.RUNNING)
+            .filter(Dag.is_active == 1)
             .subquery('running_dag_run')
         )
 
@@ -527,16 +525,12 @@ class Airflow(BaseView):
             .join(LastDagRun, and_(
                 LastDagRun.c.dag_id == TI.dag_id,
                 LastDagRun.c.execution_date == TI.execution_date))
-            .filter(TI.task_id.in_(task_ids))
-            .filter(TI.dag_id.in_(dag_ids))
         )
         RunningTI = (
             session.query(TI.dag_id.label('dag_id'), TI.state.label('state'))
             .join(RunningDagRun, and_(
                 RunningDagRun.c.dag_id == TI.dag_id,
                 RunningDagRun.c.execution_date == TI.execution_date))
-            .filter(TI.task_id.in_(task_ids))
-            .filter(TI.dag_id.in_(dag_ids))
         )
 
         UnionTI = union_all(LastTI, RunningTI).alias('union_ti')