You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/06/22 13:46:03 UTC

[airflow] 13/38: Fix Orphaned tasks stuck in CeleryExecutor as running (#16550)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit c6313e48a2ea53836b2d6619741534443f08f9aa
Author: Jorrick Sleijster <jo...@gmail.com>
AuthorDate: Tue Jun 22 10:08:00 2021 +0200

    Fix Orphaned tasks stuck in CeleryExecutor as running (#16550)
    
    (cherry picked from commit 90f0088c5752b56177597725cc716f707f2f8456)
---
 airflow/executors/celery_executor.py    | 4 +---
 tests/executors/test_celery_executor.py | 2 ++
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index 553639b..567fe58 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -369,9 +369,7 @@ class CeleryExecutor(BaseExecutor):
                 "\n\t".join([repr(x) for x in timedout_keys]),
             )
             for key in timedout_keys:
-                self.event_buffer[key] = (State.FAILED, None)
-                del self.tasks[key]
-                del self.adopted_task_timeouts[key]
+                self.change_state(key, State.FAILED)
 
     def debug_dump(self) -> None:
         """Called in response to SIGUSR2 by the scheduler"""
diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py
index 19c8a0d..d15ca9a 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/executors/test_celery_executor.py
@@ -371,10 +371,12 @@ class TestCeleryExecutor(unittest.TestCase):
             key_1: queued_dttm + executor.task_adoption_timeout,
             key_2: queued_dttm + executor.task_adoption_timeout,
         }
+        executor.running = {key_1, key_2}
         executor.tasks = {key_1: AsyncResult("231"), key_2: AsyncResult("232")}
         executor.sync()
         assert executor.event_buffer == {key_1: (State.FAILED, None), key_2: (State.FAILED, None)}
         assert executor.tasks == {}
+        assert executor.running == set()
         assert executor.adopted_task_timeouts == {}