You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pi...@apache.org on 2023/03/06 21:46:56 UTC

[airflow] 12/37: Annotate KubernetesExecutor pods that we don't delete (#28844)

This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 9ad5779a54e3a2a25449336e1e1f91b0048ef17c
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Wed Jan 11 15:15:21 2023 -0600

    Annotate KubernetesExecutor pods that we don't delete (#28844)
    
    We weren't keeping track of which pods we'd finished with yet, so if you
    had `[kubernetes_executor] delete_worker_pods` false, your
    KubeExecutor would adopt every single remaining pod when starting up.
    Every time.
    
    We now annotate them with `airflow_executor_done` when processing a pods event
    from the watcher, so we can ignore the pod when doing adoption.
    
    (cherry picked from commit 72da8bff12e3133045b61936952599a84d3f53a2)
---
 airflow/executors/kubernetes_executor.py    | 22 +++++++++++++++++++++-
 tests/executors/test_kubernetes_executor.py | 18 ++++++++++++------
 2 files changed, 33 insertions(+), 7 deletions(-)

diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index 7dcba12968..8098486ded 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -52,6 +52,8 @@ from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.session import provide_session
 from airflow.utils.state import State
 
+POD_EXECUTOR_DONE_KEY = "airflow_executor_done"
+
 # TaskInstance key, command, configuration, pod_template_file
 KubernetesJobType = Tuple[TaskInstanceKey, CommandType, Any, Optional[str]]
 
@@ -360,6 +362,18 @@ class AirflowKubernetesScheduler(LoggingMixin):
             if e.status != 404:
                 raise
 
+    def patch_pod_executor_done(self, *, pod_id: str, namespace: str):
+        """Add a "done" annotation to ensure we don't continually adopt pods"""
+        self.log.debug("Patching pod %s in namespace %s to mark it as done", pod_id, namespace)
+        try:
+            self.kube_client.patch_namespaced_pod(
+                name=pod_id,
+                namespace=namespace,
+                body={"metadata": {"labels": {POD_EXECUTOR_DONE_KEY: "True"}}},
+            )
+        except ApiException as e:
+            self.log.info("Failed to patch pod %s with done annotation. Reason: %s", pod_id, e)
+
     def sync(self) -> None:
         """
         The sync function checks the status of all currently running kubernetes jobs.
@@ -710,6 +724,9 @@ class KubernetesExecutor(BaseExecutor):
                 if state != State.FAILED or self.kube_config.delete_worker_pods_on_failure:
                     self.kube_scheduler.delete_pod(pod_id, namespace)
                     self.log.info("Deleted pod: %s in namespace %s", str(key), str(namespace))
+            else:
+                self.kube_scheduler.patch_pod_executor_done(pod_id=pod_id, namespace=namespace)
+                self.log.info("Patched pod %s in namespace %s to mark it as done", str(key), str(namespace))
             try:
                 self.running.remove(key)
             except KeyError:
@@ -776,7 +793,10 @@ class KubernetesExecutor(BaseExecutor):
         new_worker_id_label = pod_generator.make_safe_label_value(self.scheduler_job_id)
         kwargs = {
             "field_selector": "status.phase=Succeeded",
-            "label_selector": f"kubernetes_executor=True,airflow-worker!={new_worker_id_label}",
+            "label_selector": (
+                "kubernetes_executor=True,"
+                f"airflow-worker!={new_worker_id_label},{POD_EXECUTOR_DONE_KEY}!=True"
+            ),
         }
         pod_list = kube_client.list_namespaced_pod(namespace=self.kube_config.kube_namespace, **kwargs)
         for pod in pod_list.items:
diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index d1210765c0..9f5304ba4a 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -549,10 +549,12 @@ class TestKubernetesExecutor:
 
     @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
     @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
-    @mock.patch("airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod")
+    @mock.patch("airflow.executors.kubernetes_executor.AirflowKubernetesScheduler")
     def test_change_state_skip_pod_deletion(
-        self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher
+        self, mock_kubescheduler, mock_get_kube_client, mock_kubernetes_job_watcher
     ):
+        mock_delete_pod = mock_kubescheduler.return_value.delete_pod
+        mock_patch_pod = mock_kubescheduler.return_value.patch_pod_executor_done
         executor = self.kubernetes_executor
         executor.kube_config.delete_worker_pods = False
         executor.kube_config.delete_worker_pods_on_failure = False
@@ -560,18 +562,21 @@ class TestKubernetesExecutor:
         executor.start()
         try:
             key = ("dag_id", "task_id", "run_id", "try_number2")
-            executor._change_state(key, State.SUCCESS, "pod_id", "default")
+            executor._change_state(key, State.SUCCESS, "pod_id", "test-namespace")
             assert executor.event_buffer[key][0] == State.SUCCESS
             mock_delete_pod.assert_not_called()
+            mock_patch_pod.assert_called_once_with(pod_id="pod_id", namespace="test-namespace")
         finally:
             executor.end()
 
     @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
     @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
-    @mock.patch("airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod")
+    @mock.patch("airflow.executors.kubernetes_executor.AirflowKubernetesScheduler")
     def test_change_state_failed_pod_deletion(
-        self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher
+        self, mock_kubescheduler, mock_get_kube_client, mock_kubernetes_job_watcher
     ):
+        mock_delete_pod = mock_kubescheduler.return_value.delete_pod
+        mock_patch_pod = mock_kubescheduler.return_value.patch_pod_executor_done
         executor = self.kubernetes_executor
         executor.kube_config.delete_worker_pods_on_failure = True
 
@@ -581,6 +586,7 @@ class TestKubernetesExecutor:
             executor._change_state(key, State.FAILED, "pod_id", "test-namespace")
             assert executor.event_buffer[key][0] == State.FAILED
             mock_delete_pod.assert_called_once_with("pod_id", "test-namespace")
+            mock_patch_pod.assert_not_called()
         finally:
             executor.end()
 
@@ -743,7 +749,7 @@ class TestKubernetesExecutor:
         mock_kube_client.list_namespaced_pod.assert_called_once_with(
             namespace="somens",
             field_selector="status.phase=Succeeded",
-            label_selector="kubernetes_executor=True,airflow-worker!=modified",
+            label_selector="kubernetes_executor=True,airflow-worker!=modified,airflow_executor_done!=True",
         )
         assert len(pod_names) == mock_kube_client.patch_namespaced_pod.call_count
         mock_kube_client.patch_namespaced_pod.assert_has_calls(