You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/02/08 00:16:24 UTC
[airflow] 03/07: Filter celery stuck task query to exclude completed tasks (#21335)
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 317dd298e02f54156f362f8d39307e575106b5b5
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Mon Feb 7 10:43:51 2022 +0100
Filter celery stuck task query to exclude completed tasks (#21335)
On testing #19769, it was reported that there was a spike in CPU usage https://github.com/apache/airflow/pull/19769#issuecomment-1029755436
Hopefully, this will fix it
(cherry picked from commit a49224fa7ce45e9765c0d752edc30430e0d3ce14)
---
airflow/executors/celery_executor.py | 7 ++++++-
tests/executors/test_celery_executor.py | 8 ++++++--
2 files changed, 12 insertions(+), 3 deletions(-)
diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index 8daced6..9185f1b 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -404,7 +404,12 @@ class CeleryExecutor(BaseExecutor):
session_ = app.backend.ResultSession()
task_cls = getattr(app.backend, "task_cls", TaskDb)
with session_cleanup(session_):
- celery_task_ids = [t.task_id for t in session_.query(task_cls.task_id).all()]
+ celery_task_ids = [
+ t.task_id
+ for t in session_.query(task_cls.task_id)
+ .filter(~task_cls.status.in_([celery_states.SUCCESS, celery_states.FAILURE]))
+ .all()
+ ]
self.log.debug("Checking for stuck queued tasks")
max_allowed_time = utcnow() - self.task_adoption_timeout
diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py
index 5632f7d..f057440 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/executors/test_celery_executor.py
@@ -42,7 +42,7 @@ from parameterized import parameterized
from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowTaskTimeout
from airflow.executors import celery_executor
-from airflow.executors.celery_executor import BulkStateFetcher
+from airflow.executors.celery_executor import BulkStateFetcher, CeleryExecutor
from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import DAG
from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance, TaskInstanceKey
@@ -532,6 +532,7 @@ class TestCeleryExecutor:
assert ti.state == state
@mock.patch("celery.backends.database.DatabaseBackend.ResultSession")
+ @mock.patch.object(CeleryExecutor, "update_all_task_states")
@pytest.mark.backend("mysql", "postgres")
@freeze_time("2020-01-01")
@pytest.mark.parametrize(
@@ -543,6 +544,7 @@ class TestCeleryExecutor:
)
def test_the_check_interval_to_clear_stuck_queued_task_is_correct_for_db_query(
self,
+ mock_update_all_task_states,
mock_result_session,
task_id,
state,
@@ -557,7 +559,9 @@ class TestCeleryExecutor:
mock_backend = DatabaseBackend(app=celery_executor.app, url="sqlite3://")
with mock.patch('airflow.executors.celery_executor.Celery.backend', mock_backend):
mock_session = mock_backend.ResultSession.return_value
- mock_session.query.return_value.all.return_value = [result_obj("SUCCESS", task_id)]
+ mock_session.query.return_value.filter.return_value.all.return_value = [
+ result_obj("SUCCESS", task_id)
+ ]
last_check_time = time.time() - 302 # should clear ti state