You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/03/19 15:06:32 UTC
[airflow] 30/42: Default to Celery Task model when backend model
does not exist (#14612)
This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 0fc6ca33fd698c123875454b64299b9af4dd4877
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Fri Mar 5 19:34:17 2021 +0000
Default to Celery Task model when backend model does not exist (#14612)
closes https://github.com/apache/airflow/issues/14586
We add this feature in https://github.com/apache/airflow/pull/12336
but looks like for some users this attribute does not exist.
I am not sure if they are using a different Celery DB Backend or not
but even Celery > 5 contains this attribute
(https://github.com/celery/celery/blob/v5.0.5/celery/backends/database/__init__.py#L66)
and even Celery 4 but this commits use the Celery Task model when an attribute
error occurs
(cherry picked from commit 33910d6c699b5528db4be40d31199626dafed912)
---
airflow/executors/celery_executor.py | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index 8bbaed1..a670294 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -35,7 +35,7 @@ from typing import Any, Dict, List, Mapping, MutableMapping, Optional, Set, Tupl
from celery import Celery, Task, states as celery_states
from celery.backends.base import BaseKeyValueStoreBackend
-from celery.backends.database import DatabaseBackend, session_cleanup
+from celery.backends.database import DatabaseBackend, Task as TaskDb, session_cleanup
from celery.result import AsyncResult
from celery.signals import import_modules as celery_import_modules
from setproctitle import setproctitle # pylint: disable=no-name-in-module
@@ -567,7 +567,7 @@ class BulkStateFetcher(LoggingMixin):
def _get_many_from_db_backend(self, async_tasks) -> Mapping[str, EventBufferValueType]:
task_ids = _tasks_list_to_task_ids(async_tasks)
session = app.backend.ResultSession()
- task_cls = app.backend.task_cls
+ task_cls = getattr(app.backend, "task_cls", TaskDb)
with session_cleanup(session):
tasks = session.query(task_cls).filter(task_cls.task_id.in_(task_ids)).all()