You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/02/08 12:47:18 UTC
[GitHub] [airflow] yuqian90 edited a comment on pull request #14048: Speed up clear_task_instances by doing a single sql delete for TaskReschedule
yuqian90 edited a comment on pull request #14048:
URL: https://github.com/apache/airflow/pull/14048#issuecomment-775124153
Hi @kaxil this is the experiment I did. `test_create_large_dag_with_task_reschedule` creates a lot of TaskInstance and `TaskReschedule` in the db. Then `test_clear_large_dag_with_task_reschedule` times how long it takes to call `clear_task_instances` on them.
```python
import pytest
@pytest.fixture(scope="module")
def big_dag():
with DAG(
'test_create_large_dag_with_task_reschedule',
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=30),
) as dag:
for i in range(1000):
PythonSensor(task_id=f'task_{i}', python_callable=lambda: False, mode="reschedule")
yield dag
import copy
def test_create_large_dag_with_task_reschedule(big_dag):
tis = []
for i in range(10):
execution_date = DEFAULT_DATE + datetime.timedelta(days=i)
big_dag.create_dagrun(
execution_date=execution_date,
state=State.RUNNING,
run_type=DagRunType.SCHEDULED,
)
for task in big_dag.tasks:
ti = TI(task=copy.copy(task), execution_date=execution_date)
tis.append(ti)
import pendulum
tss = []
for ti in tis:
for i in range(5):
tss.append(TaskReschedule(
task=ti.task,
execution_date=ti.execution_date,
try_number=1,
start_date=pendulum.now(),
end_date=pendulum.now(),
reschedule_date=pendulum.now()))
def chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i:i + n]
with create_session() as session:
for chunk in chunks([{"task_id": tr.task_id, "dag_id": tr.dag_id, "execution_date": tr.execution_date,
"try_number": tr.try_number, "start_date": tr.start_date, "end_date": tr.end_date,
"duration": tr.duration, "reschedule_date": tr.reschedule_date} for tr in tss], 10000):
session.execute(TaskReschedule.__table__.insert(), chunk)
session.commit()
def test_clear_large_dag_with_task_reschedule(big_dag):
import time
with create_session() as session:
def count_task_reschedule():
return (
session.query(TaskReschedule)
.filter(
TaskReschedule.dag_id == big_dag.dag_id,
TaskReschedule.try_number == 1,
)
.count()
)
assert count_task_reschedule() == 1000 * 10 * 5
qry = session.query(TI).filter(TI.dag_id == big_dag.dag_id).all()
start = time.time()
clear_task_instances(qry, session, dag=big_dag)
end = time.time()
print(f"clear_task_instances took {end - start}")
assert count_task_reschedule() == 0
session.rollback()
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org