You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2021/09/11 20:18:50 UTC

[airflow] branch main updated: Set task state to failed when pod is DELETED while running (#18095)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e2d069f  Set task state to failed when pod is DELETED while running (#18095)
e2d069f is described below

commit e2d069f3c78a45ca29bc21b25a9e96b4e36a5d86
Author: lindsable <47...@users.noreply.github.com>
AuthorDate: Sat Sep 11 16:18:32 2021 -0400

    Set task state to failed when pod is DELETED while running (#18095)
    
    There is a bug in the Kubernetes Job Watcher that occurs when a node with a running worker pod is removed from the cluster. If the worker pod doesn't complete before the node is removed, it is orphaned and forced deleted by the garbage collector. This is communicated by the API with a status='Running' but an event with type='DELETED'
    
    Because in the if statement the Job Watcher doesn't check the event type, the last information we get from the pod is that is it running. The running scheduler never gets any information about the pod and shows it as stuck in a queued state. This situation is fixed when the scheduler/executor restarts and this function is run.
---
 airflow/executors/kubernetes_executor.py    | 6 +++++-
 tests/executors/test_kubernetes_executor.py | 7 +++++++
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index 993787c..5e748da 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -206,7 +206,11 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
             self.log.info('Event: %s Succeeded', pod_id)
             self.watcher_queue.put((pod_id, namespace, None, annotations, resource_version))
         elif status == 'Running':
-            self.log.info('Event: %s is Running', pod_id)
+            if event['type'] == 'DELETED':
+                self.log.info('Event: Pod %s deleted before it could complete', pod_id)
+                self.watcher_queue.put((pod_id, namespace, State.FAILED, annotations, resource_version))
+            else:
+                self.log.info('Event: %s is Running', pod_id)
         else:
             self.log.warning(
                 'Event: Invalid state: %s on pod: %s in namespace %s with annotations: %s with '
diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index 025b956..2fa0b8a 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -738,6 +738,13 @@ class TestKubernetesJobWatcher(unittest.TestCase):
         self._run()
         self.assert_watcher_queue_called_once_with_state(None)
 
+    def test_process_status_running_deleted(self):
+        self.pod.status.phase = "Running"
+        self.events.append({"type": 'DELETED', "object": self.pod})
+
+        self._run()
+        self.assert_watcher_queue_called_once_with_state(State.FAILED)
+
     def test_process_status_running(self):
         self.pod.status.phase = "Running"
         self.events.append({"type": 'MODIFIED', "object": self.pod})