You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/11/20 16:14:23 UTC

[airflow] 05/16: Add "already checked" to failed pods in K8sPodOperator (#11368)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit b89f8dc36ae44727e6bca12b22a19f4631d00f55
Author: Daniel Imberman <da...@gmail.com>
AuthorDate: Fri Oct 9 16:56:56 2020 -0700

    Add "already checked" to failed pods in K8sPodOperator (#11368)
---
 .../contrib/operators/kubernetes_pod_operator.py   | 24 +++++++++++---
 airflow/kubernetes/pod_generator.py                |  8 +++++
 kubernetes_tests/test_kubernetes_pod_operator.py   | 38 ++++++++++++++++++++++
 3 files changed, 65 insertions(+), 5 deletions(-)

diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index 7754fd7..dcd6c3e 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -299,7 +299,9 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
 
             if len(pod_list.items) == 1:
                 try_numbers_match = self._try_numbers_match(context, pod_list.items[0])
-                final_state, result = self.handle_pod_overlap(labels, try_numbers_match, launcher, pod_list)
+                final_state, result = self.handle_pod_overlap(
+                    labels, try_numbers_match, launcher, pod_list.items[0]
+                )
             else:
                 final_state, _, result = self.create_new_pod_for_operator(labels, launcher)
             if final_state != State.SUCCESS:
@@ -309,7 +311,7 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
         except AirflowException as ex:
             raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
 
-    def handle_pod_overlap(self, labels, try_numbers_match, launcher, pod_list):
+    def handle_pod_overlap(self, labels, try_numbers_match, launcher, pod):
         """
         In cases where the Scheduler restarts while a KubernetsPodOperator task is running,
         this function will either continue to monitor the existing pod or launch a new pod
@@ -319,17 +321,20 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
         :param try_numbers_match: do the try numbers match? Only needed for logging purposes
         :type try_numbers_match: bool
         :param launcher: PodLauncher
-        :param pod_list: list of pods found
+        :param pod: Pod found
         """
         if try_numbers_match:
             log_line = "found a running pod with labels {} and the same try_number.".format(labels)
         else:
             log_line = "found a running pod with labels {} but a different try_number.".format(labels)
 
-        if self.reattach_on_restart:
+        # In case of failed pods, should reattach the first time, but only once
+        # as the task will have already failed.
+        if self.reattach_on_restart and not pod.metadata.labels.get("already_checked"):
             log_line = log_line + " Will attach to this pod and monitor instead of starting new one"
             self.log.info(log_line)
-            final_state, result = self.monitor_launched_pod(launcher, pod_list.items[0])
+            self.pod = pod
+            final_state, result = self.monitor_launched_pod(launcher, pod)
         else:
             log_line = log_line + "creating pod with labels {} and launcher {}".format(labels, launcher)
             self.log.info(log_line)
@@ -452,6 +457,14 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
                 launcher.delete_pod(self.pod)
         return final_state, self.pod, result
 
+    def patch_already_checked(self, pod):
+        """
+        Add an "already tried annotation to ensure we only retry once
+        """
+        pod.metadata.labels["already_checked"] = "True"
+        body = PodGenerator.serialize_pod(pod)
+        self.client.patch_namespaced_pod(pod.metadata.name, pod.metadata.namespace, body)
+
     def monitor_launched_pod(self, launcher, pod):
         """
         Monitors a pod to completion that was created by a previous KubernetesPodOperator
@@ -469,6 +482,7 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
             if self.log_events_on_failure:
                 for event in launcher.read_pod_events(pod).items:
                     self.log.error("Pod Event: %s - %s", event.reason, event.message)
+            self.patch_already_checked(self.pod)
             raise AirflowException(
                 'Pod returned a failure: {state}'.format(state=final_state)
             )
diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index 5a57230..2d30ca9 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -564,6 +564,14 @@ class PodGenerator(object):
         return reduce(PodGenerator.reconcile_pods, pod_list)
 
     @staticmethod
+    def serialize_pod(pod):
+        """
+        Converts a k8s.V1Pod into a jsonified object
+        """
+        api_client = ApiClient()
+        return api_client.sanitize_for_serialization(pod)
+
+    @staticmethod
     def deserialize_model_file(path):
         """
         :param path: Path to the file
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py
index dce9e86..2208c2d 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -1087,4 +1087,42 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
         with self.assertRaises(ApiException):
             pod = client.read_namespaced_pod(name=name, namespace=namespace)
 
+    def test_reattach_failing_pod_once(self):
+        from airflow.utils.state import State
+        client = kube_client.get_kube_client(in_cluster=False)
+        name = "test"
+        namespace = "default"
+        k = KubernetesPodOperator(
+            namespace='default',
+            image="ubuntu:16.04",
+            cmds=["bash", "-cx"],
+            arguments=["exit 1"],
+            labels={"foo": "bar"},
+            name="test",
+            task_id=name,
+            in_cluster=False,
+            do_xcom_push=False,
+            is_delete_operator_pod=False,
+            termination_grace_period=0,
+        )
+
+        context = create_context(k)
+
+        with mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod") as monitor_mock:
+            monitor_mock.return_value = (State.SUCCESS, None)
+            k.execute(context)
+            name = k.pod.metadata.name
+            pod = client.read_namespaced_pod(name=name, namespace=namespace)
+            while pod.status.phase != "Failed":
+                pod = client.read_namespaced_pod(name=name, namespace=namespace)
+        with self.assertRaises(AirflowException):
+            k.execute(context)
+        pod = client.read_namespaced_pod(name=name, namespace=namespace)
+        self.assertEqual(pod.metadata.labels["already_checked"], "True")
+        with mock.patch("airflow.contrib.operators.kubernetes_pod_operator.KubernetesPodOperator"
+                        ".create_new_pod_for_operator") as create_mock:
+            create_mock.return_value = ("success", {}, {})
+            k.execute(context)
+            create_mock.assert_called_once()
+
 # pylint: enable=unused-argument