You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2023/03/07 13:42:09 UTC
[airflow] branch main updated: Fix KubernetesPodOperator xcom push when `get_logs=False` (#29052)
This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1e81a98cc6 Fix KubernetesPodOperator xcom push when `get_logs=False` (#29052)
1e81a98cc6 is described below
commit 1e81a98cc69344a35c50b00e2d25a6d48a9bded2
Author: Hussein Awala <ho...@gmail.com>
AuthorDate: Tue Mar 7 14:41:59 2023 +0100
Fix KubernetesPodOperator xcom push when `get_logs=False` (#29052)
* wait until the container is not terminated instead of waiting while it's running
---
.../providers/cncf/kubernetes/utils/pod_manager.py | 21 ++++++++++++++++++++-
.../cncf/kubernetes/utils/test_pod_manager.py | 19 +++++++++++++++++++
2 files changed, 39 insertions(+), 1 deletion(-)
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 3345c63cc8..161a931a69 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -96,6 +96,20 @@ def container_is_running(pod: V1Pod, container_name: str) -> bool:
return container_status.state.running is not None
+def container_is_terminated(pod: V1Pod, container_name: str) -> bool:
+ """
+ Examines V1Pod ``pod`` to determine whether ``container_name`` is terminated.
+ If that container is present and terminated, returns True. Returns False otherwise.
+ """
+ container_statuses = pod.status.container_statuses if pod and pod.status else None
+ if not container_statuses:
+ return False
+ container_status = next((x for x in container_statuses if x.name == container_name), None)
+ if not container_status:
+ return False
+ return container_status.state.terminated is not None
+
+
def get_container_termination_message(pod: V1Pod, container_name: str):
with suppress(AttributeError, TypeError):
container_statuses = pod.status.container_statuses
@@ -380,7 +394,7 @@ class PodManager(LoggingMixin):
:param pod: pod spec that will be monitored
:param container_name: name of the container within the pod to monitor
"""
- while self.container_is_running(pod=pod, container_name=container_name):
+ while not self.container_is_terminated(pod=pod, container_name=container_name):
time.sleep(1)
def await_pod_completion(self, pod: V1Pod) -> V1Pod:
@@ -427,6 +441,11 @@ class PodManager(LoggingMixin):
remote_pod = self.read_pod(pod)
return container_is_running(pod=remote_pod, container_name=container_name)
+ def container_is_terminated(self, pod: V1Pod, container_name: str) -> bool:
+ """Reads pod and checks if container is terminated"""
+ remote_pod = self.read_pod(pod)
+ return container_is_terminated(pod=remote_pod, container_name=container_name)
+
@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def read_pod_logs(
self,
diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
index 0e3e84f1e5..b8b768d4d5 100644
--- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
+++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
@@ -35,6 +35,7 @@ from airflow.providers.cncf.kubernetes.utils.pod_manager import (
PodManager,
PodPhase,
container_is_running,
+ container_is_terminated,
)
from airflow.utils.timezone import utc
from tests.test_utils.providers import get_provider_version, object_exists
@@ -366,6 +367,24 @@ class TestPodManager:
"and make kube_client a required argument."
)
+ @pytest.mark.parametrize(
+ "container_state, expected_is_terminated",
+ [("waiting", False), ("running", False), ("terminated", True)],
+ )
+ def test_container_is_terminated_with_waiting_state(self, container_state, expected_is_terminated):
+ container_status = MagicMock()
+ container_status.configure_mock(
+ **{
+ "name": "base",
+ "state.waiting": True if container_state == "waiting" else None,
+ "state.running": True if container_state == "running" else None,
+ "state.terminated": True if container_state == "terminated" else None,
+ }
+ )
+ pod_info = MagicMock()
+ pod_info.status.container_statuses = [container_status]
+ assert container_is_terminated(pod_info, "base") == expected_is_terminated
+
def params_for_test_container_is_running():
"""The `container_is_running` method is designed to handle an assortment of bad objects