You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Kengo Seki (JIRA)" <ji...@apache.org> on 2016/06/17 08:52:05 UTC

[jira] [Commented] (AIRFLOW-246) dag_stats endpoint has a terrible query

    [ https://issues.apache.org/jira/browse/AIRFLOW-246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15335724#comment-15335724 ] 

Kengo Seki commented on AIRFLOW-246:
------------------------------------

Multiple left outer joins seem to affect performance. I think we can rewrite the query in question by replacing left outer join with inner join and union, such as:

{code:sql}
SELECT
  dag_id AS task_instance_dag_id,
  state AS task_instance_state,
  count(*) as count_1
FROM (
  SELECT
    task_instance.dag_id,
    task_instance.state
  FROM
    task_instance
  JOIN (
    SELECT
      dag_run.dag_id AS dag_id,
      dag_run.execution_date AS execution_date
    FROM
      dag_run
    WHERE
      dag_run.state = 'running'
  ) AS running_dag_run
  ON
    running_dag_run.dag_id = task_instance.dag_id
  AND
    running_dag_run.execution_date = task_instance.execution_date
  WHERE
    task_id IN ...

  UNION ALL

  SELECT
    task_instance.dag_id,
    task_instance.state
  FROM
    task_instance
  JOIN (
    SELECT
      dag_run.dag_id AS dag_id,
      max(dag_run.execution_date) AS execution_date
    FROM
      dag_run
    GROUP BY
      dag_run.dag_id
  ) AS last_dag_run
  ON
    last_dag_run.dag_id = task_instance.dag_id
  AND
    last_dag_run.execution_date = task_instance.execution_date
  WHERE
    task_id IN ...
) t
GROUP BY
  dag_id,
  state;
{code}

I compared these queries with some dummy data, and got x3-4 improvement.

{code}
mysql> select count(*) from dag_run;
+----------+
| count(*) |
+----------+
|     3417 |
+----------+
1 row in set (0.00 sec)

mysql> select count(*) from task_instance;
+----------+
| count(*) |
+----------+
|   229089 |
+----------+
1 row in set (0.00 sec)

mysql> SELECT task_instance.dag_id AS task_instance_dag_id, task_instance.state AS task_instance_state, count(task_instance.task_id) AS count_1 FROM task_instance LEFT OUTER JOIN (SELECT dag_run.dag_id AS dag_id, dag_run.execution_date AS execution_date FROM dag_run WHERE dag_run.state = 'running') AS running_dag_run ON running_dag_run.dag_id = task_instance.dag_id AND running_dag_run.execution_date = task_instance.execution_date LEFT OUTER JOIN (SELECT dag_run.dag_id AS dag_id, max(dag_run.execution_date) AS execution_date FROM dag_run GROUP BY dag_run.dag_id) AS last_dag_run ON last_dag_run.dag_id = task_instance.dag_id AND last_dag_run.execution_date = task_instance.execution_date WHERE task_instance.task_id IN ('all_success', 'also_run_this', 'always_true_1', 'always_true_2', 'bash_task', 'branching', 'branch_a', 'branch_b', 'branch_c', 'branch_d', 'condition', 'condition_is_False', 'condition_is_True', 'del_op', 'end', 'false_1', 'false_2', 'final_1', 'final_2', 'follow_branch_a', 'follow_branch_b', 'follow_branch_c', 'follow_branch_d', 'get_op', 'http_sensor_check', 'join', 'one_success', 'oper_1', 'oper_2', 'post_op', 'post_op_formenc', 'print_date', 'puller', 'push', 'push_by_returning', 'put_op', 'runme_0', 'runme_1', 'runme_2', 'run_after_loop', 'run_this', 'run_this_first', 'run_this_last', 'section-1', 'section-1-task-1', 'section-1-task-2', 'section-1-task-3', 'section-1-task-4', 'section-1-task-5', 'section-2', 'section-2-task-1', 'section-2-task-2', 'section-2-task-3', 'section-2-task-4', 'section-2-task-5', 'skip_operator_1', 'skip_operator_2', 'sleep', 'some-other-task', 'start', 'templated', 'test_trigger_dagrun', 'true_1', 'true_2') AND (running_dag_run.dag_id IS NOT NULL OR last_dag_run.dag_id IS NOT NULL) GROUP BY task_instance.dag_id, task_instance.state; 
+-----------------------------------------+---------------------+---------+
| task_instance_dag_id                    | task_instance_state | count_1 |
+-----------------------------------------+---------------------+---------+
| example_bash_operator                   | success             |       6 |
| example_branch_dop_operator_v3          | NULL                |       3 |
| example_branch_operator                 | skipped             |       6 |
| example_branch_operator                 | success             |       5 |
| example_http_operator                   | failed              |       1 |
| example_http_operator                   | upstream_failed     |       5 |
| example_passing_params_via_test_command | success             |       2 |
| example_short_circuit_operator          | skipped             |       2 |
| example_short_circuit_operator          | success             |       4 |
| example_skip_dag                        | skipped             |       4 |
| example_skip_dag                        | success             |       4 |
| example_subdag_operator                 | success             |       5 |
| example_trigger_controller_dag          | success             |       1 |
| example_trigger_target_dag              | success             |       2 |
| example_xcom                            | success             |       3 |
| tutorial                                | success             |       3 |
+-----------------------------------------+---------------------+---------+
16 rows in set (0.39 sec)

mysql> SELECT dag_id AS task_instance_dag_id, state AS task_instance_state, count(*) as count_1 FROM ( SELECT task_instance.dag_id, task_instance.state FROM task_instance JOIN ( SELECT dag_run.dag_id AS dag_id, dag_run.execution_date AS execution_date FROM dag_run WHERE dag_run.state = 'running' ) AS running_dag_run ON running_dag_run.dag_id = task_instance.dag_id AND running_dag_run.execution_date = task_instance.execution_date WHERE task_id IN ('all_success', 'also_run_this', 'always_true_1', 'always_true_2', 'bash_task', 'branching', 'branch_a', 'branch_b', 'branch_c', 'branch_d', 'condition', 'condition_is_False', 'condition_is_True', 'del_op', 'end', 'false_1', 'false_2', 'final_1', 'final_2', 'follow_branch_a', 'follow_branch_b', 'follow_branch_c', 'follow_branch_d', 'get_op', 'http_sensor_check', 'join', 'one_success', 'oper_1', 'oper_2', 'post_op', 'post_op_formenc', 'print_date', 'puller', 'push', 'push_by_returning', 'put_op', 'runme_0', 'runme_1', 'runme_2', 'run_after_loop', 'run_this', 'run_this_first', 'run_this_last', 'section-1', 'section-1-task-1', 'section-1-task-2', 'section-1-task-3', 'section-1-task-4', 'section-1-task-5', 'section-2', 'section-2-task-1', 'section-2-task-2', 'section-2-task-3', 'section-2-task-4', 'section-2-task-5', 'skip_operator_1', 'skip_operator_2', 'sleep', 'some-other-task', 'start', 'templated', 'test_trigger_dagrun', 'true_1', 'true_2') UNION ALL SELECT task_instance.dag_id, task_instance.state FROM task_instance JOIN ( SELECT dag_run.dag_id AS dag_id, max(dag_run.execution_date) AS execution_date FROM dag_run GROUP BY dag_run.dag_id ) AS last_dag_run ON last_dag_run.dag_id = task_instance.dag_id AND last_dag_run.execution_date = task_instance.execution_date WHERE task_id IN ('all_success', 'also_run_this', 'always_true_1', 'always_true_2', 'bash_task', 'branching', 'branch_a', 'branch_b', 'branch_c', 'branch_d', 'condition', 'condition_is_False', 'condition_is_True', 'del_op', 'end', 'false_1', 'false_2', 'final_1', 'final_2', 'follow_branch_a', 'follow_branch_b', 'follow_branch_c', 'follow_branch_d', 'get_op', 'http_sensor_check', 'join', 'one_success', 'oper_1', 'oper_2', 'post_op', 'post_op_formenc', 'print_date', 'puller', 'push', 'push_by_returning', 'put_op', 'runme_0', 'runme_1', 'runme_2', 'run_after_loop', 'run_this', 'run_this_first', 'run_this_last', 'section-1', 'section-1-task-1', 'section-1-task-2', 'section-1-task-3', 'section-1-task-4', 'section-1-task-5', 'section-2', 'section-2-task-1', 'section-2-task-2', 'section-2-task-3', 'section-2-task-4', 'section-2-task-5', 'skip_operator_1', 'skip_operator_2', 'sleep', 'some-other-task', 'start', 'templated', 'test_trigger_dagrun', 'true_1', 'true_2') ) t GROUP BY dag_id, state;
+-----------------------------------------+---------------------+---------+
| task_instance_dag_id                    | task_instance_state | count_1 |
+-----------------------------------------+---------------------+---------+
| example_bash_operator                   | success             |       6 |
| example_branch_dop_operator_v3          | NULL                |       3 |
| example_branch_operator                 | skipped             |       6 |
| example_branch_operator                 | success             |       5 |
| example_http_operator                   | failed              |       1 |
| example_http_operator                   | upstream_failed     |       5 |
| example_passing_params_via_test_command | success             |       2 |
| example_short_circuit_operator          | skipped             |       2 |
| example_short_circuit_operator          | success             |       4 |
| example_skip_dag                        | skipped             |       4 |
| example_skip_dag                        | success             |       4 |
| example_subdag_operator                 | success             |       5 |
| example_trigger_controller_dag          | success             |       1 |
| example_trigger_target_dag              | success             |       2 |
| example_xcom                            | success             |       3 |
| tutorial                                | success             |       3 |
+-----------------------------------------+---------------------+---------+
16 rows in set (0.11 sec)

mysql> -- swap execution order, just in case
mysql> reset query cache;
Query OK, 0 rows affected (0.00 sec)

mysql> SELECT dag_id AS task_instance_dag_id, state AS task_instance_state, count(*) as count_1 FROM ( SELECT task_instance.dag_id, task_instance.state FROM task_instance JOIN ( SELECT dag_run.dag_id AS dag_id, dag_run.execution_date AS execution_date FROM dag_run WHERE dag_run.state = 'running' ) AS running_dag_run ON running_dag_run.dag_id = task_instance.dag_id AND running_dag_run.execution_date = task_instance.execution_date WHERE task_id IN ('all_success', 'also_run_this', 'always_true_1', 'always_true_2', 'bash_task', 'branching', 'branch_a', 'branch_b', 'branch_c', 'branch_d', 'condition', 'condition_is_False', 'condition_is_True', 'del_op', 'end', 'false_1', 'false_2', 'final_1', 'final_2', 'follow_branch_a', 'follow_branch_b', 'follow_branch_c', 'follow_branch_d', 'get_op', 'http_sensor_check', 'join', 'one_success', 'oper_1', 'oper_2', 'post_op', 'post_op_formenc', 'print_date', 'puller', 'push', 'push_by_returning', 'put_op', 'runme_0', 'runme_1', 'runme_2', 'run_after_loop', 'run_this', 'run_this_first', 'run_this_last', 'section-1', 'section-1-task-1', 'section-1-task-2', 'section-1-task-3', 'section-1-task-4', 'section-1-task-5', 'section-2', 'section-2-task-1', 'section-2-task-2', 'section-2-task-3', 'section-2-task-4', 'section-2-task-5', 'skip_operator_1', 'skip_operator_2', 'sleep', 'some-other-task', 'start', 'templated', 'test_trigger_dagrun', 'true_1', 'true_2') UNION ALL SELECT task_instance.dag_id, task_instance.state FROM task_instance JOIN ( SELECT dag_run.dag_id AS dag_id, max(dag_run.execution_date) AS execution_date FROM dag_run GROUP BY dag_run.dag_id ) AS last_dag_run ON last_dag_run.dag_id = task_instance.dag_id AND last_dag_run.execution_date = task_instance.execution_date WHERE task_id IN ('all_success', 'also_run_this', 'always_true_1', 'always_true_2', 'bash_task', 'branching', 'branch_a', 'branch_b', 'branch_c', 'branch_d', 'condition', 'condition_is_False', 'condition_is_True', 'del_op', 'end', 'false_1', 'false_2', 'final_1', 'final_2', 'follow_branch_a', 'follow_branch_b', 'follow_branch_c', 'follow_branch_d', 'get_op', 'http_sensor_check', 'join', 'one_success', 'oper_1', 'oper_2', 'post_op', 'post_op_formenc', 'print_date', 'puller', 'push', 'push_by_returning', 'put_op', 'runme_0', 'runme_1', 'runme_2', 'run_after_loop', 'run_this', 'run_this_first', 'run_this_last', 'section-1', 'section-1-task-1', 'section-1-task-2', 'section-1-task-3', 'section-1-task-4', 'section-1-task-5', 'section-2', 'section-2-task-1', 'section-2-task-2', 'section-2-task-3', 'section-2-task-4', 'section-2-task-5', 'skip_operator_1', 'skip_operator_2', 'sleep', 'some-other-task', 'start', 'templated', 'test_trigger_dagrun', 'true_1', 'true_2') ) t GROUP BY dag_id, state;
+-----------------------------------------+---------------------+---------+
| task_instance_dag_id                    | task_instance_state | count_1 |
+-----------------------------------------+---------------------+---------+
| example_bash_operator                   | success             |       6 |
| example_branch_dop_operator_v3          | NULL                |       3 |
| example_branch_operator                 | skipped             |       6 |
| example_branch_operator                 | success             |       5 |
| example_http_operator                   | failed              |       1 |
| example_http_operator                   | upstream_failed     |       5 |
| example_passing_params_via_test_command | success             |       2 |
| example_short_circuit_operator          | skipped             |       2 |
| example_short_circuit_operator          | success             |       4 |
| example_skip_dag                        | skipped             |       4 |
| example_skip_dag                        | success             |       4 |
| example_subdag_operator                 | success             |       5 |
| example_trigger_controller_dag          | success             |       1 |
| example_trigger_target_dag              | success             |       2 |
| example_xcom                            | success             |       3 |
| tutorial                                | success             |       3 |
+-----------------------------------------+---------------------+---------+
16 rows in set (0.13 sec)

mysql> SELECT task_instance.dag_id AS task_instance_dag_id, task_instance.state AS task_instance_state, count(task_instance.task_id) AS count_1 FROM task_instance LEFT OUTER JOIN (SELECT dag_run.dag_id AS dag_id, dag_run.execution_date AS execution_date FROM dag_run WHERE dag_run.state = 'running') AS running_dag_run ON running_dag_run.dag_id = task_instance.dag_id AND running_dag_run.execution_date = task_instance.execution_date LEFT OUTER JOIN (SELECT dag_run.dag_id AS dag_id, max(dag_run.execution_date) AS execution_date FROM dag_run GROUP BY dag_run.dag_id) AS last_dag_run ON last_dag_run.dag_id = task_instance.dag_id AND last_dag_run.execution_date = task_instance.execution_date WHERE task_instance.task_id IN ('all_success', 'also_run_this', 'always_true_1', 'always_true_2', 'bash_task', 'branching', 'branch_a', 'branch_b', 'branch_c', 'branch_d', 'condition', 'condition_is_False', 'condition_is_True', 'del_op', 'end', 'false_1', 'false_2', 'final_1', 'final_2', 'follow_branch_a', 'follow_branch_b', 'follow_branch_c', 'follow_branch_d', 'get_op', 'http_sensor_check', 'join', 'one_success', 'oper_1', 'oper_2', 'post_op', 'post_op_formenc', 'print_date', 'puller', 'push', 'push_by_returning', 'put_op', 'runme_0', 'runme_1', 'runme_2', 'run_after_loop', 'run_this', 'run_this_first', 'run_this_last', 'section-1', 'section-1-task-1', 'section-1-task-2', 'section-1-task-3', 'section-1-task-4', 'section-1-task-5', 'section-2', 'section-2-task-1', 'section-2-task-2', 'section-2-task-3', 'section-2-task-4', 'section-2-task-5', 'skip_operator_1', 'skip_operator_2', 'sleep', 'some-other-task', 'start', 'templated', 'test_trigger_dagrun', 'true_1', 'true_2') AND (running_dag_run.dag_id IS NOT NULL OR last_dag_run.dag_id IS NOT NULL) GROUP BY task_instance.dag_id, task_instance.state;
+-----------------------------------------+---------------------+---------+
| task_instance_dag_id                    | task_instance_state | count_1 |
+-----------------------------------------+---------------------+---------+
| example_bash_operator                   | success             |       6 |
| example_branch_dop_operator_v3          | NULL                |       3 |
| example_branch_operator                 | skipped             |       6 |
| example_branch_operator                 | success             |       5 |
| example_http_operator                   | failed              |       1 |
| example_http_operator                   | upstream_failed     |       5 |
| example_passing_params_via_test_command | success             |       2 |
| example_short_circuit_operator          | skipped             |       2 |
| example_short_circuit_operator          | success             |       4 |
| example_skip_dag                        | skipped             |       4 |
| example_skip_dag                        | success             |       4 |
| example_subdag_operator                 | success             |       5 |
| example_trigger_controller_dag          | success             |       1 |
| example_trigger_target_dag              | success             |       2 |
| example_xcom                            | success             |       3 |
| tutorial                                | success             |       3 |
+-----------------------------------------+---------------------+---------+
16 rows in set (0.40 sec)
{code}

[~nhanlon] Could you try the new query to confirm whether it is effective in your situation? If it works, I'll try to fix the sqlalchemy code.

> dag_stats endpoint has a terrible query
> ---------------------------------------
>
>                 Key: AIRFLOW-246
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-246
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: webserver
>    Affects Versions: Airflow 1.7.1
>         Environment: MySQL Backend through sqlalchemy
>            Reporter: Neil Hanlon
>
> Hitting this endpoint creates a series of queries on the database which take over 20 seconds to run, causing the page to not load for that entire time. Luckily the main page (which includes this under "Recent Statuses") loads this synchronously, but still... waiting almost half a minute (at times more) to see the statuses for dags is really not fun.
> We have less than a million rows in the task_instance table--so it's not even a problem with that.
> Here's a query profile for the query:
> https://gist.github.com/NeilHanlon/613f12724e802bc51c23fca7d46d28bf
> We've done some optimizations on the database, but to no avail.
> The query:
> {code:sql}
> SELECT task_instance.dag_id AS task_instance_dag_id, task_instance.state AS task_instance_state, count(task_instance.task_id) AS count_1 FROM task_instance LEFT OUTER JOIN (SELECT dag_run.dag_id AS dag_id, dag_run.execution_date AS execution_date FROM dag_run WHERE dag_run.state = 'running') AS running_dag_run ON running_dag_run.dag_id = task_instance.dag_id AND running_dag_run.execution_date = task_instance.execution_date LEFT OUTER JOIN (SELECT dag_run.dag_id AS dag_id, max(dag_run.execution_date) AS execution_date FROM dag_run GROUP BY dag_run.dag_id) AS last_dag_run ON last_dag_run.dag_id = task_instance.dag_id AND last_dag_run.execution_date = task_instance.execution_date WHERE task_instance.task_id IN ... AND (running_dag_run.dag_id IS NOT NULL OR last_dag_run.dag_id IS NOT NULL) GROUP BY task_instance.dag_id, task_instance.state;
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)