You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jh...@apache.org on 2021/08/05 17:05:23 UTC

[airflow] 02/05: Fail tasks in scheduler when executor reports they failed (#15929)

This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 012321b1325c8d810ae60ad7006ab9f22dfaf95e
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Thu May 20 11:22:01 2021 +0100

    Fail tasks in scheduler when executor reports they failed (#15929)
    
    When a task fails in executor while still queued in scheduler, the executor reports
    this failure but scheduler doesn't change the task state resulting in the task
    being queued until the scheduler is restarted. This commit fixes it by ensuring
    that when a task is reported to have failed in the executor, the task is failed
    in scheduler
    
    (cherry picked from commit deececcabc080844ca89272a2e4ab1183cd51e3f)
---
 airflow/jobs/scheduler_job.py    | 4 +++-
 tests/jobs/test_scheduler_job.py | 2 +-
 2 files changed, 4 insertions(+), 2 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index b99f4b2..1758ae1 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1252,12 +1252,14 @@ class SchedulerJob(BaseJob):
                     "task says its %s. (Info: %s) Was the task killed externally?"
                 )
                 self.log.error(msg, ti, state, ti.state, info)
+
                 request = TaskCallbackRequest(
                     full_filepath=ti.dag_model.fileloc,
                     simple_task_instance=SimpleTaskInstance(ti),
                     msg=msg % (ti, state, ti.state, info),
                 )
-
+                self.log.info('Setting task instance %s state to %s as reported by executor', ti, state)
+                ti.set_state(state)
                 self.processor_agent.send_callback_to_execute(request)
 
         return len(event_buffer)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 0d1f530..37ae65b 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -907,7 +907,7 @@ class TestSchedulerJob(unittest.TestCase):
 
         self.scheduler_job._process_executor_events(session=session)
         ti1.refresh_from_db()
-        assert ti1.state == State.QUEUED
+        assert ti1.state == State.FAILED
         mock_task_callback.assert_called_once_with(
             full_filepath='/test_path1/',
             simple_task_instance=mock.ANY,