You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2023/03/07 13:42:09 UTC

[airflow] branch main updated: Fix KubernetesPodOperator xcom push when `get_logs=False` (#29052)

This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1e81a98cc6 Fix KubernetesPodOperator xcom push when `get_logs=False` (#29052)
1e81a98cc6 is described below

commit 1e81a98cc69344a35c50b00e2d25a6d48a9bded2
Author: Hussein Awala <ho...@gmail.com>
AuthorDate: Tue Mar 7 14:41:59 2023 +0100

    Fix KubernetesPodOperator xcom push when `get_logs=False` (#29052)
    
    * wait until the container is not terminated instead of waiting while it's running
---
 .../providers/cncf/kubernetes/utils/pod_manager.py  | 21 ++++++++++++++++++++-
 .../cncf/kubernetes/utils/test_pod_manager.py       | 19 +++++++++++++++++++
 2 files changed, 39 insertions(+), 1 deletion(-)

diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 3345c63cc8..161a931a69 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -96,6 +96,20 @@ def container_is_running(pod: V1Pod, container_name: str) -> bool:
     return container_status.state.running is not None
 
 
+def container_is_terminated(pod: V1Pod, container_name: str) -> bool:
+    """
+    Examines V1Pod ``pod`` to determine whether ``container_name`` is terminated.
+    If that container is present and terminated, returns True.  Returns False otherwise.
+    """
+    container_statuses = pod.status.container_statuses if pod and pod.status else None
+    if not container_statuses:
+        return False
+    container_status = next((x for x in container_statuses if x.name == container_name), None)
+    if not container_status:
+        return False
+    return container_status.state.terminated is not None
+
+
 def get_container_termination_message(pod: V1Pod, container_name: str):
     with suppress(AttributeError, TypeError):
         container_statuses = pod.status.container_statuses
@@ -380,7 +394,7 @@ class PodManager(LoggingMixin):
         :param pod: pod spec that will be monitored
         :param container_name: name of the container within the pod to monitor
         """
-        while self.container_is_running(pod=pod, container_name=container_name):
+        while not self.container_is_terminated(pod=pod, container_name=container_name):
             time.sleep(1)
 
     def await_pod_completion(self, pod: V1Pod) -> V1Pod:
@@ -427,6 +441,11 @@ class PodManager(LoggingMixin):
         remote_pod = self.read_pod(pod)
         return container_is_running(pod=remote_pod, container_name=container_name)
 
+    def container_is_terminated(self, pod: V1Pod, container_name: str) -> bool:
+        """Reads pod and checks if container is terminated"""
+        remote_pod = self.read_pod(pod)
+        return container_is_terminated(pod=remote_pod, container_name=container_name)
+
     @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
     def read_pod_logs(
         self,
diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
index 0e3e84f1e5..b8b768d4d5 100644
--- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
+++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
@@ -35,6 +35,7 @@ from airflow.providers.cncf.kubernetes.utils.pod_manager import (
     PodManager,
     PodPhase,
     container_is_running,
+    container_is_terminated,
 )
 from airflow.utils.timezone import utc
 from tests.test_utils.providers import get_provider_version, object_exists
@@ -366,6 +367,24 @@ class TestPodManager:
                 "and make kube_client a required argument."
             )
 
+    @pytest.mark.parametrize(
+        "container_state, expected_is_terminated",
+        [("waiting", False), ("running", False), ("terminated", True)],
+    )
+    def test_container_is_terminated_with_waiting_state(self, container_state, expected_is_terminated):
+        container_status = MagicMock()
+        container_status.configure_mock(
+            **{
+                "name": "base",
+                "state.waiting": True if container_state == "waiting" else None,
+                "state.running": True if container_state == "running" else None,
+                "state.terminated": True if container_state == "terminated" else None,
+            }
+        )
+        pod_info = MagicMock()
+        pod_info.status.container_statuses = [container_status]
+        assert container_is_terminated(pod_info, "base") == expected_is_terminated
+
 
 def params_for_test_container_is_running():
     """The `container_is_running` method is designed to handle an assortment of bad objects