You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/02/08 00:16:24 UTC

[airflow] 03/07: Filter celery stuck task query to exclude completed tasks (#21335)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 317dd298e02f54156f362f8d39307e575106b5b5
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Mon Feb 7 10:43:51 2022 +0100

    Filter celery stuck task query to exclude completed tasks (#21335)
    
    On testing #19769, it was reported that there was a spike in CPU usage https://github.com/apache/airflow/pull/19769#issuecomment-1029755436
    Hopefully, this will fix it
    
    (cherry picked from commit a49224fa7ce45e9765c0d752edc30430e0d3ce14)
---
 airflow/executors/celery_executor.py    | 7 ++++++-
 tests/executors/test_celery_executor.py | 8 ++++++--
 2 files changed, 12 insertions(+), 3 deletions(-)

diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index 8daced6..9185f1b 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -404,7 +404,12 @@ class CeleryExecutor(BaseExecutor):
         session_ = app.backend.ResultSession()
         task_cls = getattr(app.backend, "task_cls", TaskDb)
         with session_cleanup(session_):
-            celery_task_ids = [t.task_id for t in session_.query(task_cls.task_id).all()]
+            celery_task_ids = [
+                t.task_id
+                for t in session_.query(task_cls.task_id)
+                .filter(~task_cls.status.in_([celery_states.SUCCESS, celery_states.FAILURE]))
+                .all()
+            ]
         self.log.debug("Checking for stuck queued tasks")
 
         max_allowed_time = utcnow() - self.task_adoption_timeout
diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py
index 5632f7d..f057440 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/executors/test_celery_executor.py
@@ -42,7 +42,7 @@ from parameterized import parameterized
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException, AirflowTaskTimeout
 from airflow.executors import celery_executor
-from airflow.executors.celery_executor import BulkStateFetcher
+from airflow.executors.celery_executor import BulkStateFetcher, CeleryExecutor
 from airflow.models.baseoperator import BaseOperator
 from airflow.models.dag import DAG
 from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance, TaskInstanceKey
@@ -532,6 +532,7 @@ class TestCeleryExecutor:
                 assert ti.state == state
 
     @mock.patch("celery.backends.database.DatabaseBackend.ResultSession")
+    @mock.patch.object(CeleryExecutor, "update_all_task_states")
     @pytest.mark.backend("mysql", "postgres")
     @freeze_time("2020-01-01")
     @pytest.mark.parametrize(
@@ -543,6 +544,7 @@ class TestCeleryExecutor:
     )
     def test_the_check_interval_to_clear_stuck_queued_task_is_correct_for_db_query(
         self,
+        mock_update_all_task_states,
         mock_result_session,
         task_id,
         state,
@@ -557,7 +559,9 @@ class TestCeleryExecutor:
             mock_backend = DatabaseBackend(app=celery_executor.app, url="sqlite3://")
             with mock.patch('airflow.executors.celery_executor.Celery.backend', mock_backend):
                 mock_session = mock_backend.ResultSession.return_value
-                mock_session.query.return_value.all.return_value = [result_obj("SUCCESS", task_id)]
+                mock_session.query.return_value.filter.return_value.all.return_value = [
+                    result_obj("SUCCESS", task_id)
+                ]
 
                 last_check_time = time.time() - 302  # should clear ti state