You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/09/11 21:02:33 UTC

[airflow] 01/02: Set task state to failed when pod is DELETED while running (#18095)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 0d046727201bbc08c5579c72c6f453b93af7741e
Author: lindsable <47...@users.noreply.github.com>
AuthorDate: Sat Sep 11 16:18:32 2021 -0400

    Set task state to failed when pod is DELETED while running (#18095)
    
    There is a bug in the Kubernetes Job Watcher that occurs when a node with a running worker pod is removed from the cluster. If the worker pod doesn't complete before the node is removed, it is orphaned and forced deleted by the garbage collector. This is communicated by the API with a status='Running' but an event with type='DELETED'
    
    Because in the if statement the Job Watcher doesn't check the event type, the last information we get from the pod is that is it running. The running scheduler never gets any information about the pod and shows it as stuck in a queued state. This situation is fixed when the scheduler/executor restarts and this function is run.
    
    (cherry picked from commit e2d069f3c78a45ca29bc21b25a9e96b4e36a5d86)
---
 airflow/executors/kubernetes_executor.py    | 6 +++++-
 tests/executors/test_kubernetes_executor.py | 7 +++++++
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index 9165687..932f672 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -205,7 +205,11 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
             self.log.info('Event: %s Succeeded', pod_id)
             self.watcher_queue.put((pod_id, namespace, None, annotations, resource_version))
         elif status == 'Running':
-            self.log.info('Event: %s is Running', pod_id)
+            if event['type'] == 'DELETED':
+                self.log.info('Event: Pod %s deleted before it could complete', pod_id)
+                self.watcher_queue.put((pod_id, namespace, State.FAILED, annotations, resource_version))
+            else:
+                self.log.info('Event: %s is Running', pod_id)
         else:
             self.log.warning(
                 'Event: Invalid state: %s on pod: %s in namespace %s with annotations: %s with '
diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index 312f71f..d8e9adc 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -738,6 +738,13 @@ class TestKubernetesJobWatcher(unittest.TestCase):
         self._run()
         self.assert_watcher_queue_called_once_with_state(None)
 
+    def test_process_status_running_deleted(self):
+        self.pod.status.phase = "Running"
+        self.events.append({"type": 'DELETED', "object": self.pod})
+
+        self._run()
+        self.assert_watcher_queue_called_once_with_state(State.FAILED)
+
     def test_process_status_running(self):
         self.pod.status.phase = "Running"
         self.events.append({"type": 'MODIFIED', "object": self.pod})