You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jh...@apache.org on 2021/08/13 18:20:34 UTC

[airflow] 14/38: Fix slow (cleared) tasks being be adopted by Celery worker. (#16718)

This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 3211672835cb13e305305ff956a524a118196175
Author: Jorrick Sleijster <jo...@gmail.com>
AuthorDate: Sat Jul 3 00:02:01 2021 +0200

    Fix slow (cleared) tasks being be adopted by Celery worker. (#16718)
    
    Celery executor is currently adopting anything that has ever run before and has been cleared since then.
    
    **Example of the issue:**
    We have a DAG that runs over 150 sensor tasks and 50 ETL tasks while having a concurrency of 3 and max_active_runs of 16. This setup is required because we want to divide the resources and we don't want this DAG to take up all the resources. What will happen is that many tasks will be in scheduled for a bit as it can't queue them due to the concurrency of 3. However, because of the current implementations, if these tasks ever run before, they would get adopted by the schedulers execut [...]
    
    **Contents of the PR**:
    1. Tasks that are in scheduled should never have arrived at an executor. Hence, we remove the task state scheduled from the option to be adopted.
    2. Given this task instance `external_executor_id`  is quite important in deciding whether it is adopted, we will also reset this when we reset the state of the TaskInstance.
    
    (cherry picked from commit 554a23928efb4ff1d87d115ae2664edec3a9408c)
---
 airflow/jobs/scheduler_job.py           |  2 +-
 airflow/models/taskinstance.py          |  1 +
 tests/executors/test_celery_executor.py |  2 ++
 tests/jobs/test_scheduler_job.py        | 14 +++++++-------
 tests/models/test_cleartasks.py         | 24 ++++++++++++++++++++++++
 5 files changed, 35 insertions(+), 8 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 1758ae1..fe8e0b0 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1859,7 +1859,7 @@ class SchedulerJob(BaseJob):
                         self.log.info("Marked %d SchedulerJob instances as failed", num_failed)
                         Stats.incr(self.__class__.__name__.lower() + '_end', num_failed)
 
-                    resettable_states = [State.SCHEDULED, State.QUEUED, State.RUNNING]
+                    resettable_states = [State.QUEUED, State.RUNNING]
                     query = (
                         session.query(TI)
                         .filter(TI.state.in_(resettable_states))
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 76cb079..aeb2a22 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -170,6 +170,7 @@ def clear_task_instances(
                 # original max_tries or the last attempted try number.
                 ti.max_tries = max(ti.max_tries, ti.prev_attempted_tries)
             ti.state = State.NONE
+            ti.external_executor_id = None
             session.merge(ti)
 
         task_id_by_key[ti.dag_id][ti.execution_date][ti.try_number].add(ti.task_id)
diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py
index e24a3dd..88ea95c 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/executors/test_celery_executor.py
@@ -329,9 +329,11 @@ class TestCeleryExecutor(unittest.TestCase):
         ti1 = TaskInstance(task=task_1, execution_date=exec_date)
         ti1.external_executor_id = '231'
         ti1.queued_dttm = queued_dttm
+        ti1.state = State.QUEUED
         ti2 = TaskInstance(task=task_2, execution_date=exec_date)
         ti2.external_executor_id = '232'
         ti2.queued_dttm = queued_dttm
+        ti2.state = State.QUEUED
 
         tis = [ti1, ti2]
         executor = celery_executor.CeleryExecutor()
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 37ae65b..fe0b257 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -2124,9 +2124,9 @@ class TestSchedulerJob(unittest.TestCase):
             session=session,
         )
         ti = dr.get_task_instance(task_id=op1.task_id, session=session)
-        ti.state = State.SCHEDULED
+        ti.state = State.QUEUED
         ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session)
-        ti2.state = State.SCHEDULED
+        ti2.state = State.QUEUED
         session.commit()
 
         processor = mock.MagicMock()
@@ -2140,7 +2140,7 @@ class TestSchedulerJob(unittest.TestCase):
         assert ti.state == State.NONE
 
         ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session)
-        assert ti2.state == State.SCHEDULED, "Tasks run by Backfill Jobs should not be reset"
+        assert ti2.state == State.QUEUED, "Tasks run by Backfill Jobs should not be reset"
 
     @parameterized.expand(
         [
@@ -3654,7 +3654,7 @@ class TestSchedulerJob(unittest.TestCase):
             session=session,
         )
         ti = dr1.get_task_instances(session=session)[0]
-        ti.state = State.SCHEDULED
+        ti.state = State.QUEUED
         session.merge(ti)
         session.merge(dr1)
         session.commit()
@@ -3799,12 +3799,12 @@ class TestSchedulerJob(unittest.TestCase):
 
         ti1, ti2 = dr1.get_task_instances(session=session)
         dr1.state = State.RUNNING
-        ti1.state = State.SCHEDULED
+        ti1.state = State.QUEUED
         ti1.queued_by_job_id = old_job.id
         session.merge(dr1)
         session.merge(ti1)
 
-        ti2.state = State.SCHEDULED
+        ti2.state = State.QUEUED
         ti2.queued_by_job_id = self.scheduler_job.id
         session.merge(ti2)
         session.flush()
@@ -3816,7 +3816,7 @@ class TestSchedulerJob(unittest.TestCase):
         session.refresh(ti1)
         assert ti1.state is None
         session.refresh(ti2)
-        assert State.SCHEDULED == ti2.state
+        assert ti2.state == State.QUEUED
         session.rollback()
         if old_job.processor_agent:
             old_job.processor_agent.end()
diff --git a/tests/models/test_cleartasks.py b/tests/models/test_cleartasks.py
index 1c5606e..9b8fbd0 100644
--- a/tests/models/test_cleartasks.py
+++ b/tests/models/test_cleartasks.py
@@ -56,6 +56,7 @@ class TestClearTasks(unittest.TestCase):
 
         ti0.run()
         ti1.run()
+
         with create_session() as session:
             qry = session.query(TI).filter(TI.dag_id == dag.dag_id).all()
             clear_task_instances(qry, session, dag=dag)
@@ -68,6 +69,29 @@ class TestClearTasks(unittest.TestCase):
         assert ti1.try_number == 2
         assert ti1.max_tries == 3
 
+    def test_clear_task_instances_external_executor_id(self):
+        dag = DAG(
+            'test_clear_task_instances_external_executor_id',
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+        )
+        task0 = DummyOperator(task_id='task0', owner='test', dag=dag)
+        ti0 = TI(task=task0, execution_date=DEFAULT_DATE)
+        ti0.state = State.SUCCESS
+        ti0.external_executor_id = "some_external_executor_id"
+
+        with create_session() as session:
+            session.add(ti0)
+            session.commit()
+
+            qry = session.query(TI).filter(TI.dag_id == dag.dag_id).all()
+            clear_task_instances(qry, session, dag=dag)
+
+            ti0.refresh_from_db()
+
+            assert ti0.state is None
+            assert ti0.external_executor_id is None
+
     def test_clear_task_instances_without_task(self):
         dag = DAG(
             'test_clear_task_instances_without_task',