You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Kengo Seki (JIRA)" <ji...@apache.org> on 2016/06/17 08:52:05 UTC
[jira] [Commented] (AIRFLOW-246) dag_stats endpoint has a terrible
query
[ https://issues.apache.org/jira/browse/AIRFLOW-246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15335724#comment-15335724 ]
Kengo Seki commented on AIRFLOW-246:
------------------------------------
Multiple left outer joins seem to affect performance. I think we can rewrite the query in question by replacing left outer join with inner join and union, such as:
{code:sql}
SELECT
dag_id AS task_instance_dag_id,
state AS task_instance_state,
count(*) as count_1
FROM (
SELECT
task_instance.dag_id,
task_instance.state
FROM
task_instance
JOIN (
SELECT
dag_run.dag_id AS dag_id,
dag_run.execution_date AS execution_date
FROM
dag_run
WHERE
dag_run.state = 'running'
) AS running_dag_run
ON
running_dag_run.dag_id = task_instance.dag_id
AND
running_dag_run.execution_date = task_instance.execution_date
WHERE
task_id IN ...
UNION ALL
SELECT
task_instance.dag_id,
task_instance.state
FROM
task_instance
JOIN (
SELECT
dag_run.dag_id AS dag_id,
max(dag_run.execution_date) AS execution_date
FROM
dag_run
GROUP BY
dag_run.dag_id
) AS last_dag_run
ON
last_dag_run.dag_id = task_instance.dag_id
AND
last_dag_run.execution_date = task_instance.execution_date
WHERE
task_id IN ...
) t
GROUP BY
dag_id,
state;
{code}
I compared these queries with some dummy data, and got x3-4 improvement.
{code}
mysql> select count(*) from dag_run;
+----------+
| count(*) |
+----------+
| 3417 |
+----------+
1 row in set (0.00 sec)
mysql> select count(*) from task_instance;
+----------+
| count(*) |
+----------+
| 229089 |
+----------+
1 row in set (0.00 sec)
mysql> SELECT task_instance.dag_id AS task_instance_dag_id, task_instance.state AS task_instance_state, count(task_instance.task_id) AS count_1 FROM task_instance LEFT OUTER JOIN (SELECT dag_run.dag_id AS dag_id, dag_run.execution_date AS execution_date FROM dag_run WHERE dag_run.state = 'running') AS running_dag_run ON running_dag_run.dag_id = task_instance.dag_id AND running_dag_run.execution_date = task_instance.execution_date LEFT OUTER JOIN (SELECT dag_run.dag_id AS dag_id, max(dag_run.execution_date) AS execution_date FROM dag_run GROUP BY dag_run.dag_id) AS last_dag_run ON last_dag_run.dag_id = task_instance.dag_id AND last_dag_run.execution_date = task_instance.execution_date WHERE task_instance.task_id IN ('all_success', 'also_run_this', 'always_true_1', 'always_true_2', 'bash_task', 'branching', 'branch_a', 'branch_b', 'branch_c', 'branch_d', 'condition', 'condition_is_False', 'condition_is_True', 'del_op', 'end', 'false_1', 'false_2', 'final_1', 'final_2', 'follow_branch_a', 'follow_branch_b', 'follow_branch_c', 'follow_branch_d', 'get_op', 'http_sensor_check', 'join', 'one_success', 'oper_1', 'oper_2', 'post_op', 'post_op_formenc', 'print_date', 'puller', 'push', 'push_by_returning', 'put_op', 'runme_0', 'runme_1', 'runme_2', 'run_after_loop', 'run_this', 'run_this_first', 'run_this_last', 'section-1', 'section-1-task-1', 'section-1-task-2', 'section-1-task-3', 'section-1-task-4', 'section-1-task-5', 'section-2', 'section-2-task-1', 'section-2-task-2', 'section-2-task-3', 'section-2-task-4', 'section-2-task-5', 'skip_operator_1', 'skip_operator_2', 'sleep', 'some-other-task', 'start', 'templated', 'test_trigger_dagrun', 'true_1', 'true_2') AND (running_dag_run.dag_id IS NOT NULL OR last_dag_run.dag_id IS NOT NULL) GROUP BY task_instance.dag_id, task_instance.state;
+-----------------------------------------+---------------------+---------+
| task_instance_dag_id | task_instance_state | count_1 |
+-----------------------------------------+---------------------+---------+
| example_bash_operator | success | 6 |
| example_branch_dop_operator_v3 | NULL | 3 |
| example_branch_operator | skipped | 6 |
| example_branch_operator | success | 5 |
| example_http_operator | failed | 1 |
| example_http_operator | upstream_failed | 5 |
| example_passing_params_via_test_command | success | 2 |
| example_short_circuit_operator | skipped | 2 |
| example_short_circuit_operator | success | 4 |
| example_skip_dag | skipped | 4 |
| example_skip_dag | success | 4 |
| example_subdag_operator | success | 5 |
| example_trigger_controller_dag | success | 1 |
| example_trigger_target_dag | success | 2 |
| example_xcom | success | 3 |
| tutorial | success | 3 |
+-----------------------------------------+---------------------+---------+
16 rows in set (0.39 sec)
mysql> SELECT dag_id AS task_instance_dag_id, state AS task_instance_state, count(*) as count_1 FROM ( SELECT task_instance.dag_id, task_instance.state FROM task_instance JOIN ( SELECT dag_run.dag_id AS dag_id, dag_run.execution_date AS execution_date FROM dag_run WHERE dag_run.state = 'running' ) AS running_dag_run ON running_dag_run.dag_id = task_instance.dag_id AND running_dag_run.execution_date = task_instance.execution_date WHERE task_id IN ('all_success', 'also_run_this', 'always_true_1', 'always_true_2', 'bash_task', 'branching', 'branch_a', 'branch_b', 'branch_c', 'branch_d', 'condition', 'condition_is_False', 'condition_is_True', 'del_op', 'end', 'false_1', 'false_2', 'final_1', 'final_2', 'follow_branch_a', 'follow_branch_b', 'follow_branch_c', 'follow_branch_d', 'get_op', 'http_sensor_check', 'join', 'one_success', 'oper_1', 'oper_2', 'post_op', 'post_op_formenc', 'print_date', 'puller', 'push', 'push_by_returning', 'put_op', 'runme_0', 'runme_1', 'runme_2', 'run_after_loop', 'run_this', 'run_this_first', 'run_this_last', 'section-1', 'section-1-task-1', 'section-1-task-2', 'section-1-task-3', 'section-1-task-4', 'section-1-task-5', 'section-2', 'section-2-task-1', 'section-2-task-2', 'section-2-task-3', 'section-2-task-4', 'section-2-task-5', 'skip_operator_1', 'skip_operator_2', 'sleep', 'some-other-task', 'start', 'templated', 'test_trigger_dagrun', 'true_1', 'true_2') UNION ALL SELECT task_instance.dag_id, task_instance.state FROM task_instance JOIN ( SELECT dag_run.dag_id AS dag_id, max(dag_run.execution_date) AS execution_date FROM dag_run GROUP BY dag_run.dag_id ) AS last_dag_run ON last_dag_run.dag_id = task_instance.dag_id AND last_dag_run.execution_date = task_instance.execution_date WHERE task_id IN ('all_success', 'also_run_this', 'always_true_1', 'always_true_2', 'bash_task', 'branching', 'branch_a', 'branch_b', 'branch_c', 'branch_d', 'condition', 'condition_is_False', 'condition_is_True', 'del_op', 'end', 'false_1', 'false_2', 'final_1', 'final_2', 'follow_branch_a', 'follow_branch_b', 'follow_branch_c', 'follow_branch_d', 'get_op', 'http_sensor_check', 'join', 'one_success', 'oper_1', 'oper_2', 'post_op', 'post_op_formenc', 'print_date', 'puller', 'push', 'push_by_returning', 'put_op', 'runme_0', 'runme_1', 'runme_2', 'run_after_loop', 'run_this', 'run_this_first', 'run_this_last', 'section-1', 'section-1-task-1', 'section-1-task-2', 'section-1-task-3', 'section-1-task-4', 'section-1-task-5', 'section-2', 'section-2-task-1', 'section-2-task-2', 'section-2-task-3', 'section-2-task-4', 'section-2-task-5', 'skip_operator_1', 'skip_operator_2', 'sleep', 'some-other-task', 'start', 'templated', 'test_trigger_dagrun', 'true_1', 'true_2') ) t GROUP BY dag_id, state;
+-----------------------------------------+---------------------+---------+
| task_instance_dag_id | task_instance_state | count_1 |
+-----------------------------------------+---------------------+---------+
| example_bash_operator | success | 6 |
| example_branch_dop_operator_v3 | NULL | 3 |
| example_branch_operator | skipped | 6 |
| example_branch_operator | success | 5 |
| example_http_operator | failed | 1 |
| example_http_operator | upstream_failed | 5 |
| example_passing_params_via_test_command | success | 2 |
| example_short_circuit_operator | skipped | 2 |
| example_short_circuit_operator | success | 4 |
| example_skip_dag | skipped | 4 |
| example_skip_dag | success | 4 |
| example_subdag_operator | success | 5 |
| example_trigger_controller_dag | success | 1 |
| example_trigger_target_dag | success | 2 |
| example_xcom | success | 3 |
| tutorial | success | 3 |
+-----------------------------------------+---------------------+---------+
16 rows in set (0.11 sec)
mysql> -- swap execution order, just in case
mysql> reset query cache;
Query OK, 0 rows affected (0.00 sec)
mysql> SELECT dag_id AS task_instance_dag_id, state AS task_instance_state, count(*) as count_1 FROM ( SELECT task_instance.dag_id, task_instance.state FROM task_instance JOIN ( SELECT dag_run.dag_id AS dag_id, dag_run.execution_date AS execution_date FROM dag_run WHERE dag_run.state = 'running' ) AS running_dag_run ON running_dag_run.dag_id = task_instance.dag_id AND running_dag_run.execution_date = task_instance.execution_date WHERE task_id IN ('all_success', 'also_run_this', 'always_true_1', 'always_true_2', 'bash_task', 'branching', 'branch_a', 'branch_b', 'branch_c', 'branch_d', 'condition', 'condition_is_False', 'condition_is_True', 'del_op', 'end', 'false_1', 'false_2', 'final_1', 'final_2', 'follow_branch_a', 'follow_branch_b', 'follow_branch_c', 'follow_branch_d', 'get_op', 'http_sensor_check', 'join', 'one_success', 'oper_1', 'oper_2', 'post_op', 'post_op_formenc', 'print_date', 'puller', 'push', 'push_by_returning', 'put_op', 'runme_0', 'runme_1', 'runme_2', 'run_after_loop', 'run_this', 'run_this_first', 'run_this_last', 'section-1', 'section-1-task-1', 'section-1-task-2', 'section-1-task-3', 'section-1-task-4', 'section-1-task-5', 'section-2', 'section-2-task-1', 'section-2-task-2', 'section-2-task-3', 'section-2-task-4', 'section-2-task-5', 'skip_operator_1', 'skip_operator_2', 'sleep', 'some-other-task', 'start', 'templated', 'test_trigger_dagrun', 'true_1', 'true_2') UNION ALL SELECT task_instance.dag_id, task_instance.state FROM task_instance JOIN ( SELECT dag_run.dag_id AS dag_id, max(dag_run.execution_date) AS execution_date FROM dag_run GROUP BY dag_run.dag_id ) AS last_dag_run ON last_dag_run.dag_id = task_instance.dag_id AND last_dag_run.execution_date = task_instance.execution_date WHERE task_id IN ('all_success', 'also_run_this', 'always_true_1', 'always_true_2', 'bash_task', 'branching', 'branch_a', 'branch_b', 'branch_c', 'branch_d', 'condition', 'condition_is_False', 'condition_is_True', 'del_op', 'end', 'false_1', 'false_2', 'final_1', 'final_2', 'follow_branch_a', 'follow_branch_b', 'follow_branch_c', 'follow_branch_d', 'get_op', 'http_sensor_check', 'join', 'one_success', 'oper_1', 'oper_2', 'post_op', 'post_op_formenc', 'print_date', 'puller', 'push', 'push_by_returning', 'put_op', 'runme_0', 'runme_1', 'runme_2', 'run_after_loop', 'run_this', 'run_this_first', 'run_this_last', 'section-1', 'section-1-task-1', 'section-1-task-2', 'section-1-task-3', 'section-1-task-4', 'section-1-task-5', 'section-2', 'section-2-task-1', 'section-2-task-2', 'section-2-task-3', 'section-2-task-4', 'section-2-task-5', 'skip_operator_1', 'skip_operator_2', 'sleep', 'some-other-task', 'start', 'templated', 'test_trigger_dagrun', 'true_1', 'true_2') ) t GROUP BY dag_id, state;
+-----------------------------------------+---------------------+---------+
| task_instance_dag_id | task_instance_state | count_1 |
+-----------------------------------------+---------------------+---------+
| example_bash_operator | success | 6 |
| example_branch_dop_operator_v3 | NULL | 3 |
| example_branch_operator | skipped | 6 |
| example_branch_operator | success | 5 |
| example_http_operator | failed | 1 |
| example_http_operator | upstream_failed | 5 |
| example_passing_params_via_test_command | success | 2 |
| example_short_circuit_operator | skipped | 2 |
| example_short_circuit_operator | success | 4 |
| example_skip_dag | skipped | 4 |
| example_skip_dag | success | 4 |
| example_subdag_operator | success | 5 |
| example_trigger_controller_dag | success | 1 |
| example_trigger_target_dag | success | 2 |
| example_xcom | success | 3 |
| tutorial | success | 3 |
+-----------------------------------------+---------------------+---------+
16 rows in set (0.13 sec)
mysql> SELECT task_instance.dag_id AS task_instance_dag_id, task_instance.state AS task_instance_state, count(task_instance.task_id) AS count_1 FROM task_instance LEFT OUTER JOIN (SELECT dag_run.dag_id AS dag_id, dag_run.execution_date AS execution_date FROM dag_run WHERE dag_run.state = 'running') AS running_dag_run ON running_dag_run.dag_id = task_instance.dag_id AND running_dag_run.execution_date = task_instance.execution_date LEFT OUTER JOIN (SELECT dag_run.dag_id AS dag_id, max(dag_run.execution_date) AS execution_date FROM dag_run GROUP BY dag_run.dag_id) AS last_dag_run ON last_dag_run.dag_id = task_instance.dag_id AND last_dag_run.execution_date = task_instance.execution_date WHERE task_instance.task_id IN ('all_success', 'also_run_this', 'always_true_1', 'always_true_2', 'bash_task', 'branching', 'branch_a', 'branch_b', 'branch_c', 'branch_d', 'condition', 'condition_is_False', 'condition_is_True', 'del_op', 'end', 'false_1', 'false_2', 'final_1', 'final_2', 'follow_branch_a', 'follow_branch_b', 'follow_branch_c', 'follow_branch_d', 'get_op', 'http_sensor_check', 'join', 'one_success', 'oper_1', 'oper_2', 'post_op', 'post_op_formenc', 'print_date', 'puller', 'push', 'push_by_returning', 'put_op', 'runme_0', 'runme_1', 'runme_2', 'run_after_loop', 'run_this', 'run_this_first', 'run_this_last', 'section-1', 'section-1-task-1', 'section-1-task-2', 'section-1-task-3', 'section-1-task-4', 'section-1-task-5', 'section-2', 'section-2-task-1', 'section-2-task-2', 'section-2-task-3', 'section-2-task-4', 'section-2-task-5', 'skip_operator_1', 'skip_operator_2', 'sleep', 'some-other-task', 'start', 'templated', 'test_trigger_dagrun', 'true_1', 'true_2') AND (running_dag_run.dag_id IS NOT NULL OR last_dag_run.dag_id IS NOT NULL) GROUP BY task_instance.dag_id, task_instance.state;
+-----------------------------------------+---------------------+---------+
| task_instance_dag_id | task_instance_state | count_1 |
+-----------------------------------------+---------------------+---------+
| example_bash_operator | success | 6 |
| example_branch_dop_operator_v3 | NULL | 3 |
| example_branch_operator | skipped | 6 |
| example_branch_operator | success | 5 |
| example_http_operator | failed | 1 |
| example_http_operator | upstream_failed | 5 |
| example_passing_params_via_test_command | success | 2 |
| example_short_circuit_operator | skipped | 2 |
| example_short_circuit_operator | success | 4 |
| example_skip_dag | skipped | 4 |
| example_skip_dag | success | 4 |
| example_subdag_operator | success | 5 |
| example_trigger_controller_dag | success | 1 |
| example_trigger_target_dag | success | 2 |
| example_xcom | success | 3 |
| tutorial | success | 3 |
+-----------------------------------------+---------------------+---------+
16 rows in set (0.40 sec)
{code}
[~nhanlon] Could you try the new query to confirm whether it is effective in your situation? If it works, I'll try to fix the sqlalchemy code.
> dag_stats endpoint has a terrible query
> ---------------------------------------
>
> Key: AIRFLOW-246
> URL: https://issues.apache.org/jira/browse/AIRFLOW-246
> Project: Apache Airflow
> Issue Type: Bug
> Components: webserver
> Affects Versions: Airflow 1.7.1
> Environment: MySQL Backend through sqlalchemy
> Reporter: Neil Hanlon
>
> Hitting this endpoint creates a series of queries on the database which take over 20 seconds to run, causing the page to not load for that entire time. Luckily the main page (which includes this under "Recent Statuses") loads this synchronously, but still... waiting almost half a minute (at times more) to see the statuses for dags is really not fun.
> We have less than a million rows in the task_instance table--so it's not even a problem with that.
> Here's a query profile for the query:
> https://gist.github.com/NeilHanlon/613f12724e802bc51c23fca7d46d28bf
> We've done some optimizations on the database, but to no avail.
> The query:
> {code:sql}
> SELECT task_instance.dag_id AS task_instance_dag_id, task_instance.state AS task_instance_state, count(task_instance.task_id) AS count_1 FROM task_instance LEFT OUTER JOIN (SELECT dag_run.dag_id AS dag_id, dag_run.execution_date AS execution_date FROM dag_run WHERE dag_run.state = 'running') AS running_dag_run ON running_dag_run.dag_id = task_instance.dag_id AND running_dag_run.execution_date = task_instance.execution_date LEFT OUTER JOIN (SELECT dag_run.dag_id AS dag_id, max(dag_run.execution_date) AS execution_date FROM dag_run GROUP BY dag_run.dag_id) AS last_dag_run ON last_dag_run.dag_id = task_instance.dag_id AND last_dag_run.execution_date = task_instance.execution_date WHERE task_instance.task_id IN ... AND (running_dag_run.dag_id IS NOT NULL OR last_dag_run.dag_id IS NOT NULL) GROUP BY task_instance.dag_id, task_instance.state;
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)