You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/11/20 16:14:23 UTC
[airflow] 05/16: Add "already checked" to failed pods in
K8sPodOperator (#11368)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit b89f8dc36ae44727e6bca12b22a19f4631d00f55
Author: Daniel Imberman <da...@gmail.com>
AuthorDate: Fri Oct 9 16:56:56 2020 -0700
Add "already checked" to failed pods in K8sPodOperator (#11368)
---
.../contrib/operators/kubernetes_pod_operator.py | 24 +++++++++++---
airflow/kubernetes/pod_generator.py | 8 +++++
kubernetes_tests/test_kubernetes_pod_operator.py | 38 ++++++++++++++++++++++
3 files changed, 65 insertions(+), 5 deletions(-)
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index 7754fd7..dcd6c3e 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -299,7 +299,9 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
if len(pod_list.items) == 1:
try_numbers_match = self._try_numbers_match(context, pod_list.items[0])
- final_state, result = self.handle_pod_overlap(labels, try_numbers_match, launcher, pod_list)
+ final_state, result = self.handle_pod_overlap(
+ labels, try_numbers_match, launcher, pod_list.items[0]
+ )
else:
final_state, _, result = self.create_new_pod_for_operator(labels, launcher)
if final_state != State.SUCCESS:
@@ -309,7 +311,7 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
except AirflowException as ex:
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
- def handle_pod_overlap(self, labels, try_numbers_match, launcher, pod_list):
+ def handle_pod_overlap(self, labels, try_numbers_match, launcher, pod):
"""
In cases where the Scheduler restarts while a KubernetsPodOperator task is running,
this function will either continue to monitor the existing pod or launch a new pod
@@ -319,17 +321,20 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
:param try_numbers_match: do the try numbers match? Only needed for logging purposes
:type try_numbers_match: bool
:param launcher: PodLauncher
- :param pod_list: list of pods found
+ :param pod: Pod found
"""
if try_numbers_match:
log_line = "found a running pod with labels {} and the same try_number.".format(labels)
else:
log_line = "found a running pod with labels {} but a different try_number.".format(labels)
- if self.reattach_on_restart:
+ # In case of failed pods, should reattach the first time, but only once
+ # as the task will have already failed.
+ if self.reattach_on_restart and not pod.metadata.labels.get("already_checked"):
log_line = log_line + " Will attach to this pod and monitor instead of starting new one"
self.log.info(log_line)
- final_state, result = self.monitor_launched_pod(launcher, pod_list.items[0])
+ self.pod = pod
+ final_state, result = self.monitor_launched_pod(launcher, pod)
else:
log_line = log_line + "creating pod with labels {} and launcher {}".format(labels, launcher)
self.log.info(log_line)
@@ -452,6 +457,14 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
launcher.delete_pod(self.pod)
return final_state, self.pod, result
+ def patch_already_checked(self, pod):
+ """
+ Add an "already tried annotation to ensure we only retry once
+ """
+ pod.metadata.labels["already_checked"] = "True"
+ body = PodGenerator.serialize_pod(pod)
+ self.client.patch_namespaced_pod(pod.metadata.name, pod.metadata.namespace, body)
+
def monitor_launched_pod(self, launcher, pod):
"""
Monitors a pod to completion that was created by a previous KubernetesPodOperator
@@ -469,6 +482,7 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
if self.log_events_on_failure:
for event in launcher.read_pod_events(pod).items:
self.log.error("Pod Event: %s - %s", event.reason, event.message)
+ self.patch_already_checked(self.pod)
raise AirflowException(
'Pod returned a failure: {state}'.format(state=final_state)
)
diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index 5a57230..2d30ca9 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -564,6 +564,14 @@ class PodGenerator(object):
return reduce(PodGenerator.reconcile_pods, pod_list)
@staticmethod
+ def serialize_pod(pod):
+ """
+ Converts a k8s.V1Pod into a jsonified object
+ """
+ api_client = ApiClient()
+ return api_client.sanitize_for_serialization(pod)
+
+ @staticmethod
def deserialize_model_file(path):
"""
:param path: Path to the file
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py
index dce9e86..2208c2d 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -1087,4 +1087,42 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
with self.assertRaises(ApiException):
pod = client.read_namespaced_pod(name=name, namespace=namespace)
+ def test_reattach_failing_pod_once(self):
+ from airflow.utils.state import State
+ client = kube_client.get_kube_client(in_cluster=False)
+ name = "test"
+ namespace = "default"
+ k = KubernetesPodOperator(
+ namespace='default',
+ image="ubuntu:16.04",
+ cmds=["bash", "-cx"],
+ arguments=["exit 1"],
+ labels={"foo": "bar"},
+ name="test",
+ task_id=name,
+ in_cluster=False,
+ do_xcom_push=False,
+ is_delete_operator_pod=False,
+ termination_grace_period=0,
+ )
+
+ context = create_context(k)
+
+ with mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod") as monitor_mock:
+ monitor_mock.return_value = (State.SUCCESS, None)
+ k.execute(context)
+ name = k.pod.metadata.name
+ pod = client.read_namespaced_pod(name=name, namespace=namespace)
+ while pod.status.phase != "Failed":
+ pod = client.read_namespaced_pod(name=name, namespace=namespace)
+ with self.assertRaises(AirflowException):
+ k.execute(context)
+ pod = client.read_namespaced_pod(name=name, namespace=namespace)
+ self.assertEqual(pod.metadata.labels["already_checked"], "True")
+ with mock.patch("airflow.contrib.operators.kubernetes_pod_operator.KubernetesPodOperator"
+ ".create_new_pod_for_operator") as create_mock:
+ create_mock.return_value = ("success", {}, {})
+ k.execute(context)
+ create_mock.assert_called_once()
+
# pylint: enable=unused-argument