You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/03/19 15:06:32 UTC

[airflow] 30/42: Default to Celery Task model when backend model does not exist (#14612)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 0fc6ca33fd698c123875454b64299b9af4dd4877
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Fri Mar 5 19:34:17 2021 +0000

    Default to Celery Task model when backend model does not exist (#14612)
    
    closes https://github.com/apache/airflow/issues/14586
    
    We add this feature in https://github.com/apache/airflow/pull/12336
    but looks like for some users this attribute does not exist.
    
    I am not sure if they are using a different Celery DB Backend or not
    but even Celery > 5 contains this attribute
    (https://github.com/celery/celery/blob/v5.0.5/celery/backends/database/__init__.py#L66)
    
    and even Celery 4 but this commits use the Celery Task model when an attribute
    error occurs
    
    (cherry picked from commit 33910d6c699b5528db4be40d31199626dafed912)
---
 airflow/executors/celery_executor.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index 8bbaed1..a670294 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -35,7 +35,7 @@ from typing import Any, Dict, List, Mapping, MutableMapping, Optional, Set, Tupl
 
 from celery import Celery, Task, states as celery_states
 from celery.backends.base import BaseKeyValueStoreBackend
-from celery.backends.database import DatabaseBackend, session_cleanup
+from celery.backends.database import DatabaseBackend, Task as TaskDb, session_cleanup
 from celery.result import AsyncResult
 from celery.signals import import_modules as celery_import_modules
 from setproctitle import setproctitle  # pylint: disable=no-name-in-module
@@ -567,7 +567,7 @@ class BulkStateFetcher(LoggingMixin):
     def _get_many_from_db_backend(self, async_tasks) -> Mapping[str, EventBufferValueType]:
         task_ids = _tasks_list_to_task_ids(async_tasks)
         session = app.backend.ResultSession()
-        task_cls = app.backend.task_cls
+        task_cls = getattr(app.backend, "task_cls", TaskDb)
         with session_cleanup(session):
             tasks = session.query(task_cls).filter(task_cls.task_id.in_(task_ids)).all()