You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pi...@apache.org on 2023/03/06 21:46:59 UTC

[airflow] 15/37: KubenetesExecutor sends state even when successful (#28871)

This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 1eba989f1bd077d563e3a1b07f950367398a8281
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Fri Jan 13 00:35:16 2023 -0600

    KubenetesExecutor sends state even when successful (#28871)
    
    Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
    (cherry picked from commit 8a9959cc1ead49785b10b9b28c101e3d94cb4176)
---
 airflow/executors/kubernetes_executor.py    | 37 ++++++++++++++++++-----------
 tests/executors/test_kubernetes_executor.py | 25 +++++++++++++++++--
 2 files changed, 46 insertions(+), 16 deletions(-)

diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index 8098486ded..34204539fa 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -214,7 +214,7 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
             self.watcher_queue.put((pod_id, namespace, State.FAILED, annotations, resource_version))
         elif status == "Succeeded":
             self.log.info("Event: %s Succeeded", pod_id)
-            self.watcher_queue.put((pod_id, namespace, None, annotations, resource_version))
+            self.watcher_queue.put((pod_id, namespace, State.SUCCESS, annotations, resource_version))
         elif status == "Running":
             if event["type"] == "DELETED":
                 self.log.info("Event: Pod %s deleted before it could complete", pod_id)
@@ -719,19 +719,26 @@ class KubernetesExecutor(BaseExecutor):
         if TYPE_CHECKING:
             assert self.kube_scheduler
 
-        if state != State.RUNNING:
-            if self.kube_config.delete_worker_pods:
-                if state != State.FAILED or self.kube_config.delete_worker_pods_on_failure:
-                    self.kube_scheduler.delete_pod(pod_id, namespace)
-                    self.log.info("Deleted pod: %s in namespace %s", str(key), str(namespace))
-            else:
-                self.kube_scheduler.patch_pod_executor_done(pod_id=pod_id, namespace=namespace)
-                self.log.info("Patched pod %s in namespace %s to mark it as done", str(key), str(namespace))
-            try:
-                self.running.remove(key)
-            except KeyError:
-                self.log.debug("Could not find key: %s", str(key))
-        self.event_buffer[key] = state, None
+        if state == State.RUNNING:
+            self.event_buffer[key] = state, None
+            return
+
+        if self.kube_config.delete_worker_pods:
+            if state != State.FAILED or self.kube_config.delete_worker_pods_on_failure:
+                self.kube_scheduler.delete_pod(pod_id, namespace)
+                self.log.info("Deleted pod: %s in namespace %s", str(key), str(namespace))
+        else:
+            self.kube_scheduler.patch_pod_executor_done(pod_id=pod_id, namespace=namespace)
+            self.log.info("Patched pod %s in namespace %s to mark it as done", str(key), str(namespace))
+
+        try:
+            self.running.remove(key)
+        except KeyError:
+            self.log.debug("TI key not in running, not adding to event_buffer: %s", key)
+        else:
+            # We get multiple events once the pod hits a terminal state, and we only want to
+            # do this once, so only do it when we remove the task from running
+            self.event_buffer[key] = state, None
 
     def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]:
         tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id]
@@ -809,6 +816,8 @@ class KubernetesExecutor(BaseExecutor):
                 )
             except ApiException as e:
                 self.log.info("Failed to adopt pod %s. Reason: %s", pod.metadata.name, e)
+            pod_id = annotations_to_key(pod.metadata.annotations)
+            self.running.add(pod_id)
 
     def _flush_task_queue(self) -> None:
         if TYPE_CHECKING:
diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index 9f5304ba4a..d24f3e8fd9 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -510,8 +510,10 @@ class TestKubernetesExecutor:
         executor.start()
         try:
             key = ("dag_id", "task_id", "run_id", "try_number1")
+            executor.running = {key}
             executor._change_state(key, State.RUNNING, "pod_id", "default")
             assert executor.event_buffer[key][0] == State.RUNNING
+            assert executor.running == {key}
         finally:
             executor.end()
 
@@ -523,8 +525,10 @@ class TestKubernetesExecutor:
         executor.start()
         try:
             key = ("dag_id", "task_id", "run_id", "try_number2")
+            executor.running = {key}
             executor._change_state(key, State.SUCCESS, "pod_id", "default")
             assert executor.event_buffer[key][0] == State.SUCCESS
+            assert executor.running == set()
             mock_delete_pod.assert_called_once_with("pod_id", "default")
         finally:
             executor.end()
@@ -541,8 +545,10 @@ class TestKubernetesExecutor:
         executor.start()
         try:
             key = ("dag_id", "task_id", "run_id", "try_number3")
+            executor.running = {key}
             executor._change_state(key, State.FAILED, "pod_id", "default")
             assert executor.event_buffer[key][0] == State.FAILED
+            assert executor.running == set()
             mock_delete_pod.assert_not_called()
         finally:
             executor.end()
@@ -562,8 +568,10 @@ class TestKubernetesExecutor:
         executor.start()
         try:
             key = ("dag_id", "task_id", "run_id", "try_number2")
+            executor.running = {key}
             executor._change_state(key, State.SUCCESS, "pod_id", "test-namespace")
             assert executor.event_buffer[key][0] == State.SUCCESS
+            assert executor.running == set()
             mock_delete_pod.assert_not_called()
             mock_patch_pod.assert_called_once_with(pod_id="pod_id", namespace="test-namespace")
         finally:
@@ -583,8 +591,10 @@ class TestKubernetesExecutor:
         executor.start()
         try:
             key = ("dag_id", "task_id", "run_id", "try_number2")
+            executor.running = {key}
             executor._change_state(key, State.FAILED, "pod_id", "test-namespace")
             assert executor.event_buffer[key][0] == State.FAILED
+            assert executor.running == set()
             mock_delete_pod.assert_called_once_with("pod_id", "test-namespace")
             mock_patch_pod.assert_not_called()
         finally:
@@ -733,17 +743,27 @@ class TestKubernetesExecutor:
         executor.kube_client = mock_kube_client
         executor.kube_config.kube_namespace = "somens"
         pod_names = ["one", "two"]
+
+        def get_annotations(pod_name):
+            return {
+                "dag_id": "dag",
+                "run_id": "run_id",
+                "task_id": pod_name,
+                "try_number": "1",
+            }
+
         mock_kube_client.list_namespaced_pod.return_value.items = [
             k8s.V1Pod(
                 metadata=k8s.V1ObjectMeta(
                     name=pod_name,
                     labels={"airflow-worker": pod_name},
-                    annotations={"some_annotation": "hello"},
+                    annotations=get_annotations(pod_name),
                     namespace="somens",
                 )
             )
             for pod_name in pod_names
         ]
+        expected_running_ti_keys = {annotations_to_key(get_annotations(pod_name)) for pod_name in pod_names}
 
         executor._adopt_completed_pods(mock_kube_client)
         mock_kube_client.list_namespaced_pod.assert_called_once_with(
@@ -763,6 +783,7 @@ class TestKubernetesExecutor:
             ],
             any_order=True,
         )
+        assert executor.running == expected_running_ti_keys
 
     @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
     def test_not_adopt_unassigned_task(self, mock_kube_client):
@@ -1144,7 +1165,7 @@ class TestKubernetesJobWatcher:
         self.events.append({"type": "MODIFIED", "object": self.pod})
 
         self._run()
-        self.assert_watcher_queue_called_once_with_state(None)
+        self.assert_watcher_queue_called_once_with_state(State.SUCCESS)
 
     def test_process_status_running_deleted(self):
         self.pod.status.phase = "Running"