You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Qian Yu (Jira)" <ji...@apache.org> on 2020/03/14 11:04:00 UTC

[jira] [Updated] (AIRFLOW-7063) dag.clear() slowness caused by multiple UNION statements and tis.count()

     [ https://issues.apache.org/jira/browse/AIRFLOW-7063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Qian Yu updated AIRFLOW-7063:
-----------------------------
    Description: 
When multiple {{ExternalTaskMarker}} are used, {{dag.clear()}} becomes very slow when clearing all the {{ExternalTaskMarker}} together. The slowness turns out to come from this line of code in {{dag.clear()}}:
{code:python}
        if dry_run:
            tis = tis.all()
            session.expunge_all()
            return tis

        count = tis.count()   <------- This line is the culprit
        do_it = True
        if count == 0:
            return 0
{code}
This is the sql generated by {{tis.count()}} when there are three {{ExternalTaskMarker}} being cleared together. Note there's nothing wrong with the sql and it's reasonably efficient when executed on postgres even when the number of UNION statements is bigger (e.g. 30 UNION statements takes about 13ms in the docker container I started with breeze)
 But it takes more than three minutes for sqlalchemy to construct this count query before it goes to the database.

The fix is really simple, just get rid of the count() and query all the entries from the db instead. The function becomes ten times faster when {{tis.count()}} is removed.

 There are multiple places people are complaining about similar problems with sqlalchemy count() being slower than the query itself. It does not look like sqlalchemy is going to fix this issue:
 [https://stackoverflow.com/questions/14754994/why-is-sqlalchemy-count-much-slower-than-the-raw-query]
 [https://gist.github.com/hest/8798884]

 
{code:sql}
[2020-03-14 09:42:50,264] {base.py:1203} INFO - SELECT count(*) AS count_1
FROM (SELECT anon_2.anon_3_anon_4_task_instance_try_number AS anon_2_anon_3_anon_4_task_instance_try_number, anon_2.anon_3_anon_4_task_instance_task_id AS anon_2_anon_3_anon_4_task_instance_task_id, anon_2.anon_3_anon_4_task_instance_dag_id AS anon_2_anon_3_anon_4_task_instance_dag_id, anon_2.anon_3_anon_4_task_instance_execution_date AS anon_2_anon_3_anon_4_task_instance_execution_date, anon_2.anon_3_anon_4_task_instance_start_date AS anon_2_anon_3_anon_4_task_instance_start_date, anon_2.anon_3_anon_4_task_instance_end_date AS anon_2_anon_3_anon_4_task_instance_end_date, anon_2.anon_3_anon_4_task_instance_duration AS anon_2_anon_3_anon_4_task_instance_duration, anon_2.anon_3_anon_4_task_instance_state AS anon_2_anon_3_anon_4_task_instance_state, anon_2.anon_3_anon_4_task_instance_max_tries AS anon_2_anon_3_anon_4_task_instance_max_tries, anon_2.anon_3_anon_4_task_instance_hostname AS anon_2_anon_3_anon_4_task_instance_hostname, anon_2.anon_3_anon_4_task_instance_unixname AS anon_2_anon_3_anon_4_task_instance_unixname, anon_2.anon_3_anon_4_task_instance_job_id AS anon_2_anon_3_anon_4_task_instance_job_id, anon_2.anon_3_anon_4_task_instance_pool AS anon_2_anon_3_anon_4_task_instance_pool, anon_2.anon_3_anon_4_task_instance_pool_slots AS anon_2_anon_3_anon_4_task_instance_pool_slots, anon_2.anon_3_anon_4_task_instance_queue AS anon_2_anon_3_anon_4_task_instance_queue, anon_2.anon_3_anon_4_task_instance_priority_weight AS anon_2_anon_3_anon_4_task_instance_priority_weight, anon_2.anon_3_anon_4_task_instance_operator AS anon_2_anon_3_anon_4_task_instance_operator, anon_2.anon_3_anon_4_task_instance_queued_dttm AS anon_2_anon_3_anon_4_task_instance_queued_dttm, anon_2.anon_3_anon_4_task_instance_pid AS anon_2_anon_3_anon_4_task_instance_pid, anon_2.anon_3_anon_4_task_instance_executor_config AS anon_2_anon_3_anon_4_task_instance_executor_config
FROM (SELECT anon_3.anon_4_task_instance_try_number AS anon_3_anon_4_task_instance_try_number, anon_3.anon_4_task_instance_task_id AS anon_3_anon_4_task_instance_task_id, anon_3.anon_4_task_instance_dag_id AS anon_3_anon_4_task_instance_dag_id, anon_3.anon_4_task_instance_execution_date AS anon_3_anon_4_task_instance_execution_date, anon_3.anon_4_task_instance_start_date AS anon_3_anon_4_task_instance_start_date, anon_3.anon_4_task_instance_end_date AS anon_3_anon_4_task_instance_end_date, anon_3.anon_4_task_instance_duration AS anon_3_anon_4_task_instance_duration, anon_3.anon_4_task_instance_state AS anon_3_anon_4_task_instance_state, anon_3.anon_4_task_instance_max_tries AS anon_3_anon_4_task_instance_max_tries, anon_3.anon_4_task_instance_hostname AS anon_3_anon_4_task_instance_hostname, anon_3.anon_4_task_instance_unixname AS anon_3_anon_4_task_instance_unixname, anon_3.anon_4_task_instance_job_id AS anon_3_anon_4_task_instance_job_id, anon_3.anon_4_task_instance_pool AS anon_3_anon_4_task_instance_pool, anon_3.anon_4_task_instance_pool_slots AS anon_3_anon_4_task_instance_pool_slots, anon_3.anon_4_task_instance_queue AS anon_3_anon_4_task_instance_queue, anon_3.anon_4_task_instance_priority_weight AS anon_3_anon_4_task_instance_priority_weight, anon_3.anon_4_task_instance_operator AS anon_3_anon_4_task_instance_operator, anon_3.anon_4_task_instance_queued_dttm AS anon_3_anon_4_task_instance_queued_dttm, anon_3.anon_4_task_instance_pid AS anon_3_anon_4_task_instance_pid, anon_3.anon_4_task_instance_executor_config AS anon_3_anon_4_task_instance_executor_config
FROM (SELECT anon_4.task_instance_try_number AS anon_4_task_instance_try_number, anon_4.task_instance_task_id AS anon_4_task_instance_task_id, anon_4.task_instance_dag_id AS anon_4_task_instance_dag_id, anon_4.task_instance_execution_date AS anon_4_task_instance_execution_date, anon_4.task_instance_start_date AS anon_4_task_instance_start_date, anon_4.task_instance_end_date AS anon_4_task_instance_end_date, anon_4.task_instance_duration AS anon_4_task_instance_duration, anon_4.task_instance_state AS anon_4_task_instance_state, anon_4.task_instance_max_tries AS anon_4_task_instance_max_tries, anon_4.task_instance_hostname AS anon_4_task_instance_hostname, anon_4.task_instance_unixname AS anon_4_task_instance_unixname, anon_4.task_instance_job_id AS anon_4_task_instance_job_id, anon_4.task_instance_pool AS anon_4_task_instance_pool, anon_4.task_instance_pool_slots AS anon_4_task_instance_pool_slots, anon_4.task_instance_queue AS anon_4_task_instance_queue, anon_4.task_instance_priority_weight AS anon_4_task_instance_priority_weight, anon_4.task_instance_operator AS anon_4_task_instance_operator, anon_4.task_instance_queued_dttm AS anon_4_task_instance_queued_dttm, anon_4.task_instance_pid AS anon_4_task_instance_pid, anon_4.task_instance_executor_config AS anon_4_task_instance_executor_config
FROM (SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config
FROM task_instance
WHERE task_instance.dag_id LIKE %(dag_id_1)s AND task_instance.task_id IN (%(task_id_1)s, %(task_id_2)s, %(task_id_3)s, %(task_id_4)s) AND task_instance.execution_date >= %(execution_date_1)s AND task_instance.execution_date <= %(execution_date_2)s UNION SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config
FROM task_instance
WHERE task_instance.dag_id LIKE %(dag_id_2)s AND task_instance.task_id IN (%(task_id_5)s) AND task_instance.execution_date >= %(execution_date_3)s AND task_instance.execution_date <= %(execution_date_4)s) AS anon_4 UNION SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config
FROM task_instance
WHERE task_instance.dag_id LIKE %(dag_id_3)s AND task_instance.task_id IN (%(task_id_6)s) AND task_instance.execution_date >= %(execution_date_5)s AND task_instance.execution_date <= %(execution_date_6)s) AS anon_3 UNION SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config
FROM task_instance
WHERE task_instance.dag_id LIKE %(dag_id_4)s AND task_instance.task_id IN (%(task_id_7)s) AND task_instance.execution_date >= %(execution_date_7)s AND task_instance.execution_date <= %(execution_date_8)s) AS anon_2) AS anon_1
[2020-03-14 09:42:50,265] {base.py:1208} INFO - "\x1b[1m{'dag_id_1': 'agg_dag', 'task_id_1': 'start', 'task_id_2': 'daily_tas_0', 'task_id_3': 'daily_tas_1', 'task_id_4': 'daily_tas_2', 'execution_date_1': datetime.datetime(2015, 1, 4, 0, 0, tzinfo=pendulum.timezone("UTC")), 'execution_date_2': datetime.datetime(2015, 1, 4, 0, 0, tzinfo=pendulum.timezone("UTC")), 'dag_id_2': 'daily_dag', 'task_id_5': 'daily_tas', 'execution_date_3': datetime.datetime(2015, 1, 3, 0, 0, tzinfo=pendulum.timezone("UTC")), 'execution_date_4': datetime.datetime(2015, 1, 3, 0, 0, tzinfo=pendulum.timezone("UTC")), 'dag_id_3': 'daily_dag', 'task_id_6': 'daily_tas', 'execution_date_5': datetime.datetime(2015, 1, 2, 0, 0, tzinfo=pendulum.timezone("UTC")), 'execution_date_6': datetime.datetime(2015, 1, 2, 0, 0, tzinfo=pendulum.timezone("UTC")), 'dag_id_4': 'daily_dag', 'task_id_7': 'daily_tas', 'execution_date_7': datetime.datetime(2015, 1, 4, 0, 0, tzinfo=pendulum.timezone("UTC")), 'execution_date_8': datetime.datetime(2015, 1, 4, 0, 0, tzinfo=pendulum.timezone("UTC"))}\x1b[0m"
{code}

  was:
When multiple

{ExternalTaskMarker} are used, \{dag.clear()} becomes very slow when clearing all the \{ExternalTaskMarker}

togther.
 The slowness turns out to come from this line of code in

{dag.clear()}

:
{code:python}
        if dry_run:
            tis = tis.all()
            session.expunge_all()
            return tis

        count = tis.count()   <------- This line is the culprit
        do_it = True
        if count == 0:
            return 0
{code}
This is the sql generated by

{tis.count()} when there are three \{ExternalTaskMarker} being cleared together. Note there's nothing wrong with the sql and it's reasonably efficient when executed on postgres even when the number of UNION statements is bigger (e.g. 30 UNION statements takes about 13ms in the docker container I started with breeze)
 But it takes more than three minutes for sqlalchemy to construct this count query before it goes to the database.
 
 The fix is really simple, just get rid of the count() and query all the entries from the db instead. The function becomes ten times faster when \{tis.count()} is removed.

 There are multiple places people are complaining about similar problems with sqlalchemy count() being slower than the query itself. It does not look like sqlalchemy is going to fix this issue:
https://stackoverflow.com/questions/14754994/why-is-sqlalchemy-count-much-slower-than-the-raw-query
https://gist.github.com/hest/8798884

 
{code:sql}
[2020-03-14 09:42:50,264] {base.py:1203} INFO - SELECT count(*) AS count_1
FROM (SELECT anon_2.anon_3_anon_4_task_instance_try_number AS anon_2_anon_3_anon_4_task_instance_try_number, anon_2.anon_3_anon_4_task_instance_task_id AS anon_2_anon_3_anon_4_task_instance_task_id, anon_2.anon_3_anon_4_task_instance_dag_id AS anon_2_anon_3_anon_4_task_instance_dag_id, anon_2.anon_3_anon_4_task_instance_execution_date AS anon_2_anon_3_anon_4_task_instance_execution_date, anon_2.anon_3_anon_4_task_instance_start_date AS anon_2_anon_3_anon_4_task_instance_start_date, anon_2.anon_3_anon_4_task_instance_end_date AS anon_2_anon_3_anon_4_task_instance_end_date, anon_2.anon_3_anon_4_task_instance_duration AS anon_2_anon_3_anon_4_task_instance_duration, anon_2.anon_3_anon_4_task_instance_state AS anon_2_anon_3_anon_4_task_instance_state, anon_2.anon_3_anon_4_task_instance_max_tries AS anon_2_anon_3_anon_4_task_instance_max_tries, anon_2.anon_3_anon_4_task_instance_hostname AS anon_2_anon_3_anon_4_task_instance_hostname, anon_2.anon_3_anon_4_task_instance_unixname AS anon_2_anon_3_anon_4_task_instance_unixname, anon_2.anon_3_anon_4_task_instance_job_id AS anon_2_anon_3_anon_4_task_instance_job_id, anon_2.anon_3_anon_4_task_instance_pool AS anon_2_anon_3_anon_4_task_instance_pool, anon_2.anon_3_anon_4_task_instance_pool_slots AS anon_2_anon_3_anon_4_task_instance_pool_slots, anon_2.anon_3_anon_4_task_instance_queue AS anon_2_anon_3_anon_4_task_instance_queue, anon_2.anon_3_anon_4_task_instance_priority_weight AS anon_2_anon_3_anon_4_task_instance_priority_weight, anon_2.anon_3_anon_4_task_instance_operator AS anon_2_anon_3_anon_4_task_instance_operator, anon_2.anon_3_anon_4_task_instance_queued_dttm AS anon_2_anon_3_anon_4_task_instance_queued_dttm, anon_2.anon_3_anon_4_task_instance_pid AS anon_2_anon_3_anon_4_task_instance_pid, anon_2.anon_3_anon_4_task_instance_executor_config AS anon_2_anon_3_anon_4_task_instance_executor_config
FROM (SELECT anon_3.anon_4_task_instance_try_number AS anon_3_anon_4_task_instance_try_number, anon_3.anon_4_task_instance_task_id AS anon_3_anon_4_task_instance_task_id, anon_3.anon_4_task_instance_dag_id AS anon_3_anon_4_task_instance_dag_id, anon_3.anon_4_task_instance_execution_date AS anon_3_anon_4_task_instance_execution_date, anon_3.anon_4_task_instance_start_date AS anon_3_anon_4_task_instance_start_date, anon_3.anon_4_task_instance_end_date AS anon_3_anon_4_task_instance_end_date, anon_3.anon_4_task_instance_duration AS anon_3_anon_4_task_instance_duration, anon_3.anon_4_task_instance_state AS anon_3_anon_4_task_instance_state, anon_3.anon_4_task_instance_max_tries AS anon_3_anon_4_task_instance_max_tries, anon_3.anon_4_task_instance_hostname AS anon_3_anon_4_task_instance_hostname, anon_3.anon_4_task_instance_unixname AS anon_3_anon_4_task_instance_unixname, anon_3.anon_4_task_instance_job_id AS anon_3_anon_4_task_instance_job_id, anon_3.anon_4_task_instance_pool AS anon_3_anon_4_task_instance_pool, anon_3.anon_4_task_instance_pool_slots AS anon_3_anon_4_task_instance_pool_slots, anon_3.anon_4_task_instance_queue AS anon_3_anon_4_task_instance_queue, anon_3.anon_4_task_instance_priority_weight AS anon_3_anon_4_task_instance_priority_weight, anon_3.anon_4_task_instance_operator AS anon_3_anon_4_task_instance_operator, anon_3.anon_4_task_instance_queued_dttm AS anon_3_anon_4_task_instance_queued_dttm, anon_3.anon_4_task_instance_pid AS anon_3_anon_4_task_instance_pid, anon_3.anon_4_task_instance_executor_config AS anon_3_anon_4_task_instance_executor_config
FROM (SELECT anon_4.task_instance_try_number AS anon_4_task_instance_try_number, anon_4.task_instance_task_id AS anon_4_task_instance_task_id, anon_4.task_instance_dag_id AS anon_4_task_instance_dag_id, anon_4.task_instance_execution_date AS anon_4_task_instance_execution_date, anon_4.task_instance_start_date AS anon_4_task_instance_start_date, anon_4.task_instance_end_date AS anon_4_task_instance_end_date, anon_4.task_instance_duration AS anon_4_task_instance_duration, anon_4.task_instance_state AS anon_4_task_instance_state, anon_4.task_instance_max_tries AS anon_4_task_instance_max_tries, anon_4.task_instance_hostname AS anon_4_task_instance_hostname, anon_4.task_instance_unixname AS anon_4_task_instance_unixname, anon_4.task_instance_job_id AS anon_4_task_instance_job_id, anon_4.task_instance_pool AS anon_4_task_instance_pool, anon_4.task_instance_pool_slots AS anon_4_task_instance_pool_slots, anon_4.task_instance_queue AS anon_4_task_instance_queue, anon_4.task_instance_priority_weight AS anon_4_task_instance_priority_weight, anon_4.task_instance_operator AS anon_4_task_instance_operator, anon_4.task_instance_queued_dttm AS anon_4_task_instance_queued_dttm, anon_4.task_instance_pid AS anon_4_task_instance_pid, anon_4.task_instance_executor_config AS anon_4_task_instance_executor_config
FROM (SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config
FROM task_instance
WHERE task_instance.dag_id LIKE %(dag_id_1)s AND task_instance.task_id IN (%(task_id_1)s, %(task_id_2)s, %(task_id_3)s, %(task_id_4)s) AND task_instance.execution_date >= %(execution_date_1)s AND task_instance.execution_date <= %(execution_date_2)s UNION SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config
FROM task_instance
WHERE task_instance.dag_id LIKE %(dag_id_2)s AND task_instance.task_id IN (%(task_id_5)s) AND task_instance.execution_date >= %(execution_date_3)s AND task_instance.execution_date <= %(execution_date_4)s) AS anon_4 UNION SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config
FROM task_instance
WHERE task_instance.dag_id LIKE %(dag_id_3)s AND task_instance.task_id IN (%(task_id_6)s) AND task_instance.execution_date >= %(execution_date_5)s AND task_instance.execution_date <= %(execution_date_6)s) AS anon_3 UNION SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config
FROM task_instance
WHERE task_instance.dag_id LIKE %(dag_id_4)s AND task_instance.task_id IN (%(task_id_7)s) AND task_instance.execution_date >= %(execution_date_7)s AND task_instance.execution_date <= %(execution_date_8)s) AS anon_2) AS anon_1
[2020-03-14 09:42:50,265] {base.py:1208} INFO - "\x1b[1m{'dag_id_1': 'agg_dag', 'task_id_1': 'start', 'task_id_2': 'daily_tas_0', 'task_id_3': 'daily_tas_1', 'task_id_4': 'daily_tas_2', 'execution_date_1': datetime.datetime(2015, 1, 4, 0, 0, tzinfo=pendulum.timezone("UTC")), 'execution_date_2': datetime.datetime(2015, 1, 4, 0, 0, tzinfo=pendulum.timezone("UTC")), 'dag_id_2': 'daily_dag', 'task_id_5': 'daily_tas', 'execution_date_3': datetime.datetime(2015, 1, 3, 0, 0, tzinfo=pendulum.timezone("UTC")), 'execution_date_4': datetime.datetime(2015, 1, 3, 0, 0, tzinfo=pendulum.timezone("UTC")), 'dag_id_3': 'daily_dag', 'task_id_6': 'daily_tas', 'execution_date_5': datetime.datetime(2015, 1, 2, 0, 0, tzinfo=pendulum.timezone("UTC")), 'execution_date_6': datetime.datetime(2015, 1, 2, 0, 0, tzinfo=pendulum.timezone("UTC")), 'dag_id_4': 'daily_dag', 'task_id_7': 'daily_tas', 'execution_date_7': datetime.datetime(2015, 1, 4, 0, 0, tzinfo=pendulum.timezone("UTC")), 'execution_date_8': datetime.datetime(2015, 1, 4, 0, 0, tzinfo=pendulum.timezone("UTC"))}\x1b[0m"
{code}


> dag.clear() slowness caused by multiple UNION statements and tis.count()
> ------------------------------------------------------------------------
>
>                 Key: AIRFLOW-7063
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-7063
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: webserver
>    Affects Versions: 1.10.9
>            Reporter: Qian Yu
>            Assignee: Qian Yu
>            Priority: Major
>
> When multiple {{ExternalTaskMarker}} are used, {{dag.clear()}} becomes very slow when clearing all the {{ExternalTaskMarker}} together. The slowness turns out to come from this line of code in {{dag.clear()}}:
> {code:python}
>         if dry_run:
>             tis = tis.all()
>             session.expunge_all()
>             return tis
>         count = tis.count()   <------- This line is the culprit
>         do_it = True
>         if count == 0:
>             return 0
> {code}
> This is the sql generated by {{tis.count()}} when there are three {{ExternalTaskMarker}} being cleared together. Note there's nothing wrong with the sql and it's reasonably efficient when executed on postgres even when the number of UNION statements is bigger (e.g. 30 UNION statements takes about 13ms in the docker container I started with breeze)
>  But it takes more than three minutes for sqlalchemy to construct this count query before it goes to the database.
> The fix is really simple, just get rid of the count() and query all the entries from the db instead. The function becomes ten times faster when {{tis.count()}} is removed.
>  There are multiple places people are complaining about similar problems with sqlalchemy count() being slower than the query itself. It does not look like sqlalchemy is going to fix this issue:
>  [https://stackoverflow.com/questions/14754994/why-is-sqlalchemy-count-much-slower-than-the-raw-query]
>  [https://gist.github.com/hest/8798884]
>  
> {code:sql}
> [2020-03-14 09:42:50,264] {base.py:1203} INFO - SELECT count(*) AS count_1
> FROM (SELECT anon_2.anon_3_anon_4_task_instance_try_number AS anon_2_anon_3_anon_4_task_instance_try_number, anon_2.anon_3_anon_4_task_instance_task_id AS anon_2_anon_3_anon_4_task_instance_task_id, anon_2.anon_3_anon_4_task_instance_dag_id AS anon_2_anon_3_anon_4_task_instance_dag_id, anon_2.anon_3_anon_4_task_instance_execution_date AS anon_2_anon_3_anon_4_task_instance_execution_date, anon_2.anon_3_anon_4_task_instance_start_date AS anon_2_anon_3_anon_4_task_instance_start_date, anon_2.anon_3_anon_4_task_instance_end_date AS anon_2_anon_3_anon_4_task_instance_end_date, anon_2.anon_3_anon_4_task_instance_duration AS anon_2_anon_3_anon_4_task_instance_duration, anon_2.anon_3_anon_4_task_instance_state AS anon_2_anon_3_anon_4_task_instance_state, anon_2.anon_3_anon_4_task_instance_max_tries AS anon_2_anon_3_anon_4_task_instance_max_tries, anon_2.anon_3_anon_4_task_instance_hostname AS anon_2_anon_3_anon_4_task_instance_hostname, anon_2.anon_3_anon_4_task_instance_unixname AS anon_2_anon_3_anon_4_task_instance_unixname, anon_2.anon_3_anon_4_task_instance_job_id AS anon_2_anon_3_anon_4_task_instance_job_id, anon_2.anon_3_anon_4_task_instance_pool AS anon_2_anon_3_anon_4_task_instance_pool, anon_2.anon_3_anon_4_task_instance_pool_slots AS anon_2_anon_3_anon_4_task_instance_pool_slots, anon_2.anon_3_anon_4_task_instance_queue AS anon_2_anon_3_anon_4_task_instance_queue, anon_2.anon_3_anon_4_task_instance_priority_weight AS anon_2_anon_3_anon_4_task_instance_priority_weight, anon_2.anon_3_anon_4_task_instance_operator AS anon_2_anon_3_anon_4_task_instance_operator, anon_2.anon_3_anon_4_task_instance_queued_dttm AS anon_2_anon_3_anon_4_task_instance_queued_dttm, anon_2.anon_3_anon_4_task_instance_pid AS anon_2_anon_3_anon_4_task_instance_pid, anon_2.anon_3_anon_4_task_instance_executor_config AS anon_2_anon_3_anon_4_task_instance_executor_config
> FROM (SELECT anon_3.anon_4_task_instance_try_number AS anon_3_anon_4_task_instance_try_number, anon_3.anon_4_task_instance_task_id AS anon_3_anon_4_task_instance_task_id, anon_3.anon_4_task_instance_dag_id AS anon_3_anon_4_task_instance_dag_id, anon_3.anon_4_task_instance_execution_date AS anon_3_anon_4_task_instance_execution_date, anon_3.anon_4_task_instance_start_date AS anon_3_anon_4_task_instance_start_date, anon_3.anon_4_task_instance_end_date AS anon_3_anon_4_task_instance_end_date, anon_3.anon_4_task_instance_duration AS anon_3_anon_4_task_instance_duration, anon_3.anon_4_task_instance_state AS anon_3_anon_4_task_instance_state, anon_3.anon_4_task_instance_max_tries AS anon_3_anon_4_task_instance_max_tries, anon_3.anon_4_task_instance_hostname AS anon_3_anon_4_task_instance_hostname, anon_3.anon_4_task_instance_unixname AS anon_3_anon_4_task_instance_unixname, anon_3.anon_4_task_instance_job_id AS anon_3_anon_4_task_instance_job_id, anon_3.anon_4_task_instance_pool AS anon_3_anon_4_task_instance_pool, anon_3.anon_4_task_instance_pool_slots AS anon_3_anon_4_task_instance_pool_slots, anon_3.anon_4_task_instance_queue AS anon_3_anon_4_task_instance_queue, anon_3.anon_4_task_instance_priority_weight AS anon_3_anon_4_task_instance_priority_weight, anon_3.anon_4_task_instance_operator AS anon_3_anon_4_task_instance_operator, anon_3.anon_4_task_instance_queued_dttm AS anon_3_anon_4_task_instance_queued_dttm, anon_3.anon_4_task_instance_pid AS anon_3_anon_4_task_instance_pid, anon_3.anon_4_task_instance_executor_config AS anon_3_anon_4_task_instance_executor_config
> FROM (SELECT anon_4.task_instance_try_number AS anon_4_task_instance_try_number, anon_4.task_instance_task_id AS anon_4_task_instance_task_id, anon_4.task_instance_dag_id AS anon_4_task_instance_dag_id, anon_4.task_instance_execution_date AS anon_4_task_instance_execution_date, anon_4.task_instance_start_date AS anon_4_task_instance_start_date, anon_4.task_instance_end_date AS anon_4_task_instance_end_date, anon_4.task_instance_duration AS anon_4_task_instance_duration, anon_4.task_instance_state AS anon_4_task_instance_state, anon_4.task_instance_max_tries AS anon_4_task_instance_max_tries, anon_4.task_instance_hostname AS anon_4_task_instance_hostname, anon_4.task_instance_unixname AS anon_4_task_instance_unixname, anon_4.task_instance_job_id AS anon_4_task_instance_job_id, anon_4.task_instance_pool AS anon_4_task_instance_pool, anon_4.task_instance_pool_slots AS anon_4_task_instance_pool_slots, anon_4.task_instance_queue AS anon_4_task_instance_queue, anon_4.task_instance_priority_weight AS anon_4_task_instance_priority_weight, anon_4.task_instance_operator AS anon_4_task_instance_operator, anon_4.task_instance_queued_dttm AS anon_4_task_instance_queued_dttm, anon_4.task_instance_pid AS anon_4_task_instance_pid, anon_4.task_instance_executor_config AS anon_4_task_instance_executor_config
> FROM (SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config
> FROM task_instance
> WHERE task_instance.dag_id LIKE %(dag_id_1)s AND task_instance.task_id IN (%(task_id_1)s, %(task_id_2)s, %(task_id_3)s, %(task_id_4)s) AND task_instance.execution_date >= %(execution_date_1)s AND task_instance.execution_date <= %(execution_date_2)s UNION SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config
> FROM task_instance
> WHERE task_instance.dag_id LIKE %(dag_id_2)s AND task_instance.task_id IN (%(task_id_5)s) AND task_instance.execution_date >= %(execution_date_3)s AND task_instance.execution_date <= %(execution_date_4)s) AS anon_4 UNION SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config
> FROM task_instance
> WHERE task_instance.dag_id LIKE %(dag_id_3)s AND task_instance.task_id IN (%(task_id_6)s) AND task_instance.execution_date >= %(execution_date_5)s AND task_instance.execution_date <= %(execution_date_6)s) AS anon_3 UNION SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config
> FROM task_instance
> WHERE task_instance.dag_id LIKE %(dag_id_4)s AND task_instance.task_id IN (%(task_id_7)s) AND task_instance.execution_date >= %(execution_date_7)s AND task_instance.execution_date <= %(execution_date_8)s) AS anon_2) AS anon_1
> [2020-03-14 09:42:50,265] {base.py:1208} INFO - "\x1b[1m{'dag_id_1': 'agg_dag', 'task_id_1': 'start', 'task_id_2': 'daily_tas_0', 'task_id_3': 'daily_tas_1', 'task_id_4': 'daily_tas_2', 'execution_date_1': datetime.datetime(2015, 1, 4, 0, 0, tzinfo=pendulum.timezone("UTC")), 'execution_date_2': datetime.datetime(2015, 1, 4, 0, 0, tzinfo=pendulum.timezone("UTC")), 'dag_id_2': 'daily_dag', 'task_id_5': 'daily_tas', 'execution_date_3': datetime.datetime(2015, 1, 3, 0, 0, tzinfo=pendulum.timezone("UTC")), 'execution_date_4': datetime.datetime(2015, 1, 3, 0, 0, tzinfo=pendulum.timezone("UTC")), 'dag_id_3': 'daily_dag', 'task_id_6': 'daily_tas', 'execution_date_5': datetime.datetime(2015, 1, 2, 0, 0, tzinfo=pendulum.timezone("UTC")), 'execution_date_6': datetime.datetime(2015, 1, 2, 0, 0, tzinfo=pendulum.timezone("UTC")), 'dag_id_4': 'daily_dag', 'task_id_7': 'daily_tas', 'execution_date_7': datetime.datetime(2015, 1, 4, 0, 0, tzinfo=pendulum.timezone("UTC")), 'execution_date_8': datetime.datetime(2015, 1, 4, 0, 0, tzinfo=pendulum.timezone("UTC"))}\x1b[0m"
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)