You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/02/08 12:47:18 UTC

[GitHub] [airflow] yuqian90 edited a comment on pull request #14048: Speed up clear_task_instances by doing a single sql delete for TaskReschedule

yuqian90 edited a comment on pull request #14048:
URL: https://github.com/apache/airflow/pull/14048#issuecomment-775124153


   Hi @kaxil this is the experiment I did. `test_create_large_dag_with_task_reschedule` creates a lot of TaskInstance and `TaskReschedule` in the db. Then `test_clear_large_dag_with_task_reschedule` times how long it takes to call `clear_task_instances` on them.
   
   ```python
   import pytest
   
   @pytest.fixture(scope="module")
   def big_dag():
       with DAG(
           'test_create_large_dag_with_task_reschedule',
           start_date=DEFAULT_DATE,
           end_date=DEFAULT_DATE + datetime.timedelta(days=30),
       ) as dag:
           for i in range(1000):
               PythonSensor(task_id=f'task_{i}', python_callable=lambda: False, mode="reschedule")
   
       yield dag
   
   import copy
   
   def test_create_large_dag_with_task_reschedule(big_dag):
   
       tis = []
   
       for i in range(10):
           execution_date = DEFAULT_DATE + datetime.timedelta(days=i)
           big_dag.create_dagrun(
               execution_date=execution_date,
               state=State.RUNNING,
               run_type=DagRunType.SCHEDULED,
           )
           for task in big_dag.tasks:
               ti = TI(task=copy.copy(task), execution_date=execution_date)
               tis.append(ti)
   
       import pendulum
   
       tss = []
       for ti in tis:
           for i in range(5):
               tss.append(TaskReschedule(
                   task=ti.task,
                   execution_date=ti.execution_date,
                   try_number=1,
                   start_date=pendulum.now(),
                   end_date=pendulum.now(),
                   reschedule_date=pendulum.now()))
   
       def chunks(lst, n):
           """Yield successive n-sized chunks from lst."""
           for i in range(0, len(lst), n):
               yield lst[i:i + n]
   
   
       with create_session() as session:
           for chunk in chunks([{"task_id": tr.task_id, "dag_id": tr.dag_id, "execution_date": tr.execution_date,
                                 "try_number": tr.try_number, "start_date": tr.start_date, "end_date": tr.end_date,
                                 "duration": tr.duration, "reschedule_date": tr.reschedule_date} for tr in tss], 10000):
               session.execute(TaskReschedule.__table__.insert(), chunk)
               session.commit()
   
   def test_clear_large_dag_with_task_reschedule(big_dag):
       import time
   
       with create_session() as session:
           def count_task_reschedule():
               return (
                   session.query(TaskReschedule)
                   .filter(
                       TaskReschedule.dag_id == big_dag.dag_id,
                       TaskReschedule.try_number == 1,
                   )
                   .count()
               )
   
           assert count_task_reschedule() == 1000 * 10 * 5
           qry = session.query(TI).filter(TI.dag_id == big_dag.dag_id).all()
           start = time.time()
           clear_task_instances(qry, session, dag=big_dag)
           end = time.time()
           print(f"clear_task_instances took {end - start}")
           assert count_task_reschedule() == 0
           session.rollback()
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org