You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/06/22 13:46:03 UTC
[airflow] 13/38: Fix Orphaned tasks stuck in CeleryExecutor as
running (#16550)
This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit c6313e48a2ea53836b2d6619741534443f08f9aa
Author: Jorrick Sleijster <jo...@gmail.com>
AuthorDate: Tue Jun 22 10:08:00 2021 +0200
Fix Orphaned tasks stuck in CeleryExecutor as running (#16550)
(cherry picked from commit 90f0088c5752b56177597725cc716f707f2f8456)
---
airflow/executors/celery_executor.py | 4 +---
tests/executors/test_celery_executor.py | 2 ++
2 files changed, 3 insertions(+), 3 deletions(-)
diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index 553639b..567fe58 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -369,9 +369,7 @@ class CeleryExecutor(BaseExecutor):
"\n\t".join([repr(x) for x in timedout_keys]),
)
for key in timedout_keys:
- self.event_buffer[key] = (State.FAILED, None)
- del self.tasks[key]
- del self.adopted_task_timeouts[key]
+ self.change_state(key, State.FAILED)
def debug_dump(self) -> None:
"""Called in response to SIGUSR2 by the scheduler"""
diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py
index 19c8a0d..d15ca9a 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/executors/test_celery_executor.py
@@ -371,10 +371,12 @@ class TestCeleryExecutor(unittest.TestCase):
key_1: queued_dttm + executor.task_adoption_timeout,
key_2: queued_dttm + executor.task_adoption_timeout,
}
+ executor.running = {key_1, key_2}
executor.tasks = {key_1: AsyncResult("231"), key_2: AsyncResult("232")}
executor.sync()
assert executor.event_buffer == {key_1: (State.FAILED, None), key_2: (State.FAILED, None)}
assert executor.tasks == {}
+ assert executor.running == set()
assert executor.adopted_task_timeouts == {}