You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "t oo (Jira)" <ji...@apache.org> on 2019/12/26 17:18:00 UTC

[jira] [Created] (AIRFLOW-6360) config option to skip task_stats from getting completed dagruns/tis

t oo created AIRFLOW-6360:
-----------------------------

             Summary: config option to skip task_stats from getting completed dagruns/tis
                 Key: AIRFLOW-6360
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6360
             Project: Apache Airflow
          Issue Type: Improvement
          Components: ui
    Affects Versions: 1.10.6
            Reporter: t oo


task_stats endpoint to display 'recent tasks' is very slow when someone has many dagruns or many tasks in a dag.


BEFORE
 LastDagRun = (
            session.query(DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date'))
                .join(Dag, Dag.dag_id == DagRun.dag_id)
                .filter(DagRun.state != State.RUNNING)
                .filter(Dag.is_active == True)  # noqa: E712
                .filter(Dag.is_subdag == False)  # noqa: E712
                .group_by(DagRun.dag_id)
                .subquery('last_dag_run')
        )
        RunningDagRun = (
            session.query(DagRun.dag_id, DagRun.execution_date)
                .join(Dag, Dag.dag_id == DagRun.dag_id)
                .filter(DagRun.state == State.RUNNING)
                .filter(Dag.is_active == True)  # noqa: E712
                .filter(Dag.is_subdag == False)  # noqa: E712
                .subquery('running_dag_run')
        )

        # Select all task_instances from active dag_runs.
        # If no dag_run is active, return task instances from most recent dag_run.
        LastTI = (
            session.query(TI.dag_id.label('dag_id'), TI.state.label('state'))
            .join(LastDagRun, and_(
                LastDagRun.c.dag_id == TI.dag_id,
                LastDagRun.c.execution_date == TI.execution_date))
        )
        RunningTI = (
            session.query(TI.dag_id.label('dag_id'), TI.state.label('state'))
            .join(RunningDagRun, and_(
                RunningDagRun.c.dag_id == TI.dag_id,
                RunningDagRun.c.execution_date == TI.execution_date))
        )

        UnionTI = union_all(LastTI, RunningTI).alias('union_ti')
        qry = (
            session.query(UnionTI.c.dag_id, UnionTI.c.state, sqla.func.count())
            .group_by(UnionTI.c.dag_id, UnionTI.c.state)
        )



AFTER
#we not interested in stats for dagruns already completed, only want active ones

        RunningDagRun = (
            session.query(DagRun.dag_id, DagRun.execution_date)
                .join(Dag, Dag.dag_id == DagRun.dag_id)
                .filter(DagRun.state == State.RUNNING,
                Dag.is_active,
                Dag.is_subdag == False)  # noqa: E712
                .subquery('running_dag_run')
        )

        # Select all task_instances from active dag_runs.
        qry = (
            session.query(TI.dag_id.label('dag_id'), TI.state.label('state'), sqla.func.count())
            .join(RunningDagRun, and_(
                RunningDagRun.c.dag_id == TI.dag_id,
                RunningDagRun.c.execution_date == TI.execution_date))
            .group_by(TI.dag_id, TI.state)
        )





--
This message was sent by Atlassian Jira
(v8.3.4#803005)