You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/15 03:32:39 UTC
[airflow] 04/47: Fix KubernetesPodOperator reattachment (#10230)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 37f08f2629b882841f935b8b59650892719b87e1
Author: Daniel Imberman <da...@gmail.com>
AuthorDate: Tue Aug 11 07:01:27 2020 -0700
Fix KubernetesPodOperator reattachment (#10230)
(cherry picked from commit 8cd2be9e161635480581a0dc723b69ed24166f8d)
---
.../contrib/operators/kubernetes_pod_operator.py | 46 ++++++++++++++++------
1 file changed, 33 insertions(+), 13 deletions(-)
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index 41f0df3..98464b7 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -270,23 +270,16 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
pod_list = client.list_namespaced_pod(self.namespace, label_selector=label_selector)
- if len(pod_list.items) > 1:
+ if len(pod_list.items) > 1 and self.reattach_on_restart:
raise AirflowException(
'More than one pod running with labels: '
'{label_selector}'.format(label_selector=label_selector))
launcher = pod_launcher.PodLauncher(kube_client=client, extract_xcom=self.do_xcom_push)
- if len(pod_list.items) == 1 and \
- self._try_numbers_do_not_match(context, pod_list.items[0]) and \
- self.reattach_on_restart:
- self.log.info("found a running pod with labels %s but a different try_number"
- "Will attach to this pod and monitor instead of starting new one", labels)
- final_state, _, result = self.create_new_pod_for_operator(labels, launcher)
- elif len(pod_list.items) == 1:
- self.log.info("found a running pod with labels %s."
- "Will monitor this pod instead of starting new one", labels)
- final_state, result = self.monitor_launched_pod(launcher, pod_list[0])
+ if len(pod_list.items) == 1:
+ try_numbers_match = self._try_numbers_match(context, pod_list.items[0])
+ final_state, result = self.handle_pod_overlap(labels, try_numbers_match, launcher, pod_list)
else:
final_state, _, result = self.create_new_pod_for_operator(labels, launcher)
if final_state != State.SUCCESS:
@@ -296,14 +289,41 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
except AirflowException as ex:
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
+ def handle_pod_overlap(self, labels, try_numbers_match, launcher, pod_list):
+ """
+ In cases where the Scheduler restarts while a KubernetsPodOperator task is running,
+ this function will either continue to monitor the existing pod or launch a new pod
+ based on the `reattach_on_restart` parameter.
+ :param labels: labels used to determine if a pod is repeated
+ :type labels: dict
+ :param try_numbers_match: do the try numbers match? Only needed for logging purposes
+ :type try_numbers_match: bool
+ :param launcher: PodLauncher
+ :param pod_list: list of pods found
+ """
+ if try_numbers_match:
+ log_line = "found a running pod with labels {} and the same try_number.".format(labels)
+ else:
+ log_line = "found a running pod with labels {} but a different try_number.".format(labels)
+
+ if self.reattach_on_restart:
+ log_line = log_line + " Will attach to this pod and monitor instead of starting new one"
+ self.log.info(log_line)
+ final_state, result = self.monitor_launched_pod(launcher, pod_list.items[0])
+ else:
+ log_line = log_line + "creating pod with labels {} and launcher {}".format(labels, launcher)
+ self.log.info(log_line)
+ final_state, _, result = self.create_new_pod_for_operator(labels, launcher)
+ return final_state, result
+
@staticmethod
def _get_pod_identifying_label_string(labels):
filtered_labels = {label_id: label for label_id, label in labels.items() if label_id != 'try_number'}
return ','.join([label_id + '=' + label for label_id, label in sorted(filtered_labels.items())])
@staticmethod
- def _try_numbers_do_not_match(context, pod):
- return pod.metadata.labels['try_number'] != context['ti'].try_number
+ def _try_numbers_match(context, pod):
+ return pod.metadata.labels['try_number'] == context['ti'].try_number
@staticmethod
def _set_resources(resources):