You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pi...@apache.org on 2023/03/06 21:46:59 UTC
[airflow] 15/37: KubenetesExecutor sends state even when successful (#28871)
This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 1eba989f1bd077d563e3a1b07f950367398a8281
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Fri Jan 13 00:35:16 2023 -0600
KubenetesExecutor sends state even when successful (#28871)
Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
(cherry picked from commit 8a9959cc1ead49785b10b9b28c101e3d94cb4176)
---
airflow/executors/kubernetes_executor.py | 37 ++++++++++++++++++-----------
tests/executors/test_kubernetes_executor.py | 25 +++++++++++++++++--
2 files changed, 46 insertions(+), 16 deletions(-)
diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index 8098486ded..34204539fa 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -214,7 +214,7 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
self.watcher_queue.put((pod_id, namespace, State.FAILED, annotations, resource_version))
elif status == "Succeeded":
self.log.info("Event: %s Succeeded", pod_id)
- self.watcher_queue.put((pod_id, namespace, None, annotations, resource_version))
+ self.watcher_queue.put((pod_id, namespace, State.SUCCESS, annotations, resource_version))
elif status == "Running":
if event["type"] == "DELETED":
self.log.info("Event: Pod %s deleted before it could complete", pod_id)
@@ -719,19 +719,26 @@ class KubernetesExecutor(BaseExecutor):
if TYPE_CHECKING:
assert self.kube_scheduler
- if state != State.RUNNING:
- if self.kube_config.delete_worker_pods:
- if state != State.FAILED or self.kube_config.delete_worker_pods_on_failure:
- self.kube_scheduler.delete_pod(pod_id, namespace)
- self.log.info("Deleted pod: %s in namespace %s", str(key), str(namespace))
- else:
- self.kube_scheduler.patch_pod_executor_done(pod_id=pod_id, namespace=namespace)
- self.log.info("Patched pod %s in namespace %s to mark it as done", str(key), str(namespace))
- try:
- self.running.remove(key)
- except KeyError:
- self.log.debug("Could not find key: %s", str(key))
- self.event_buffer[key] = state, None
+ if state == State.RUNNING:
+ self.event_buffer[key] = state, None
+ return
+
+ if self.kube_config.delete_worker_pods:
+ if state != State.FAILED or self.kube_config.delete_worker_pods_on_failure:
+ self.kube_scheduler.delete_pod(pod_id, namespace)
+ self.log.info("Deleted pod: %s in namespace %s", str(key), str(namespace))
+ else:
+ self.kube_scheduler.patch_pod_executor_done(pod_id=pod_id, namespace=namespace)
+ self.log.info("Patched pod %s in namespace %s to mark it as done", str(key), str(namespace))
+
+ try:
+ self.running.remove(key)
+ except KeyError:
+ self.log.debug("TI key not in running, not adding to event_buffer: %s", key)
+ else:
+ # We get multiple events once the pod hits a terminal state, and we only want to
+ # do this once, so only do it when we remove the task from running
+ self.event_buffer[key] = state, None
def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]:
tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id]
@@ -809,6 +816,8 @@ class KubernetesExecutor(BaseExecutor):
)
except ApiException as e:
self.log.info("Failed to adopt pod %s. Reason: %s", pod.metadata.name, e)
+ pod_id = annotations_to_key(pod.metadata.annotations)
+ self.running.add(pod_id)
def _flush_task_queue(self) -> None:
if TYPE_CHECKING:
diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index 9f5304ba4a..d24f3e8fd9 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -510,8 +510,10 @@ class TestKubernetesExecutor:
executor.start()
try:
key = ("dag_id", "task_id", "run_id", "try_number1")
+ executor.running = {key}
executor._change_state(key, State.RUNNING, "pod_id", "default")
assert executor.event_buffer[key][0] == State.RUNNING
+ assert executor.running == {key}
finally:
executor.end()
@@ -523,8 +525,10 @@ class TestKubernetesExecutor:
executor.start()
try:
key = ("dag_id", "task_id", "run_id", "try_number2")
+ executor.running = {key}
executor._change_state(key, State.SUCCESS, "pod_id", "default")
assert executor.event_buffer[key][0] == State.SUCCESS
+ assert executor.running == set()
mock_delete_pod.assert_called_once_with("pod_id", "default")
finally:
executor.end()
@@ -541,8 +545,10 @@ class TestKubernetesExecutor:
executor.start()
try:
key = ("dag_id", "task_id", "run_id", "try_number3")
+ executor.running = {key}
executor._change_state(key, State.FAILED, "pod_id", "default")
assert executor.event_buffer[key][0] == State.FAILED
+ assert executor.running == set()
mock_delete_pod.assert_not_called()
finally:
executor.end()
@@ -562,8 +568,10 @@ class TestKubernetesExecutor:
executor.start()
try:
key = ("dag_id", "task_id", "run_id", "try_number2")
+ executor.running = {key}
executor._change_state(key, State.SUCCESS, "pod_id", "test-namespace")
assert executor.event_buffer[key][0] == State.SUCCESS
+ assert executor.running == set()
mock_delete_pod.assert_not_called()
mock_patch_pod.assert_called_once_with(pod_id="pod_id", namespace="test-namespace")
finally:
@@ -583,8 +591,10 @@ class TestKubernetesExecutor:
executor.start()
try:
key = ("dag_id", "task_id", "run_id", "try_number2")
+ executor.running = {key}
executor._change_state(key, State.FAILED, "pod_id", "test-namespace")
assert executor.event_buffer[key][0] == State.FAILED
+ assert executor.running == set()
mock_delete_pod.assert_called_once_with("pod_id", "test-namespace")
mock_patch_pod.assert_not_called()
finally:
@@ -733,17 +743,27 @@ class TestKubernetesExecutor:
executor.kube_client = mock_kube_client
executor.kube_config.kube_namespace = "somens"
pod_names = ["one", "two"]
+
+ def get_annotations(pod_name):
+ return {
+ "dag_id": "dag",
+ "run_id": "run_id",
+ "task_id": pod_name,
+ "try_number": "1",
+ }
+
mock_kube_client.list_namespaced_pod.return_value.items = [
k8s.V1Pod(
metadata=k8s.V1ObjectMeta(
name=pod_name,
labels={"airflow-worker": pod_name},
- annotations={"some_annotation": "hello"},
+ annotations=get_annotations(pod_name),
namespace="somens",
)
)
for pod_name in pod_names
]
+ expected_running_ti_keys = {annotations_to_key(get_annotations(pod_name)) for pod_name in pod_names}
executor._adopt_completed_pods(mock_kube_client)
mock_kube_client.list_namespaced_pod.assert_called_once_with(
@@ -763,6 +783,7 @@ class TestKubernetesExecutor:
],
any_order=True,
)
+ assert executor.running == expected_running_ti_keys
@mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
def test_not_adopt_unassigned_task(self, mock_kube_client):
@@ -1144,7 +1165,7 @@ class TestKubernetesJobWatcher:
self.events.append({"type": "MODIFIED", "object": self.pod})
self._run()
- self.assert_watcher_queue_called_once_with_state(None)
+ self.assert_watcher_queue_called_once_with_state(State.SUCCESS)
def test_process_status_running_deleted(self):
self.pod.status.phase = "Running"