You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/09/11 21:02:32 UTC

[airflow] branch v2-1-test updated (9168a0b -> 4377be3)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a change to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    from 9168a0b  Clearly document no breaking change for ``>=2.1.2, <=2.1.4``
     new 0d04672  Set task state to failed when pod is DELETED while running (#18095)
     new 4377be3  Update CHANGELOG.txt

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGELOG.txt                               | 1 +
 airflow/executors/kubernetes_executor.py    | 6 +++++-
 tests/executors/test_kubernetes_executor.py | 7 +++++++
 3 files changed, 13 insertions(+), 1 deletion(-)

[airflow] 02/02: Update CHANGELOG.txt

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 4377be3b38fbdfde876e727f0c73d8f978a3b1c7
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Sat Sep 11 22:01:53 2021 +0100

    Update CHANGELOG.txt
---
 CHANGELOG.txt | 1 +
 1 file changed, 1 insertion(+)

diff --git a/CHANGELOG.txt b/CHANGELOG.txt
index 792cebf..b5749d6 100644
--- a/CHANGELOG.txt
+++ b/CHANGELOG.txt
@@ -26,6 +26,7 @@ Bug Fixes
 - Improve discoverability of Provider packages' functionality
 - Do not let ``create_dagrun`` overwrite explicit ``run_id`` (#17728)
 - BugFix: Regression on pid reset to allow task start after heartbeat (#17333)
+- Set task state to failed when pod is DELETED while running (#18095)
 
 Doc only changes
 """"""""""""""""

[airflow] 01/02: Set task state to failed when pod is DELETED while running (#18095)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 0d046727201bbc08c5579c72c6f453b93af7741e
Author: lindsable <47...@users.noreply.github.com>
AuthorDate: Sat Sep 11 16:18:32 2021 -0400

    Set task state to failed when pod is DELETED while running (#18095)
    
    There is a bug in the Kubernetes Job Watcher that occurs when a node with a running worker pod is removed from the cluster. If the worker pod doesn't complete before the node is removed, it is orphaned and forced deleted by the garbage collector. This is communicated by the API with a status='Running' but an event with type='DELETED'
    
    Because in the if statement the Job Watcher doesn't check the event type, the last information we get from the pod is that is it running. The running scheduler never gets any information about the pod and shows it as stuck in a queued state. This situation is fixed when the scheduler/executor restarts and this function is run.
    
    (cherry picked from commit e2d069f3c78a45ca29bc21b25a9e96b4e36a5d86)
---
 airflow/executors/kubernetes_executor.py    | 6 +++++-
 tests/executors/test_kubernetes_executor.py | 7 +++++++
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index 9165687..932f672 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -205,7 +205,11 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
             self.log.info('Event: %s Succeeded', pod_id)
             self.watcher_queue.put((pod_id, namespace, None, annotations, resource_version))
         elif status == 'Running':
-            self.log.info('Event: %s is Running', pod_id)
+            if event['type'] == 'DELETED':
+                self.log.info('Event: Pod %s deleted before it could complete', pod_id)
+                self.watcher_queue.put((pod_id, namespace, State.FAILED, annotations, resource_version))
+            else:
+                self.log.info('Event: %s is Running', pod_id)
         else:
             self.log.warning(
                 'Event: Invalid state: %s on pod: %s in namespace %s with annotations: %s with '
diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index 312f71f..d8e9adc 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -738,6 +738,13 @@ class TestKubernetesJobWatcher(unittest.TestCase):
         self._run()
         self.assert_watcher_queue_called_once_with_state(None)
 
+    def test_process_status_running_deleted(self):
+        self.pod.status.phase = "Running"
+        self.events.append({"type": 'DELETED', "object": self.pod})
+
+        self._run()
+        self.assert_watcher_queue_called_once_with_state(State.FAILED)
+
     def test_process_status_running(self):
         self.pod.status.phase = "Running"
         self.events.append({"type": 'MODIFIED', "object": self.pod})