You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/15 03:32:39 UTC

[airflow] 04/47: Fix KubernetesPodOperator reattachment (#10230)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 37f08f2629b882841f935b8b59650892719b87e1
Author: Daniel Imberman <da...@gmail.com>
AuthorDate: Tue Aug 11 07:01:27 2020 -0700

    Fix KubernetesPodOperator reattachment (#10230)
    
    (cherry picked from commit 8cd2be9e161635480581a0dc723b69ed24166f8d)
---
 .../contrib/operators/kubernetes_pod_operator.py   | 46 ++++++++++++++++------
 1 file changed, 33 insertions(+), 13 deletions(-)

diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index 41f0df3..98464b7 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -270,23 +270,16 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
 
             pod_list = client.list_namespaced_pod(self.namespace, label_selector=label_selector)
 
-            if len(pod_list.items) > 1:
+            if len(pod_list.items) > 1 and self.reattach_on_restart:
                 raise AirflowException(
                     'More than one pod running with labels: '
                     '{label_selector}'.format(label_selector=label_selector))
 
             launcher = pod_launcher.PodLauncher(kube_client=client, extract_xcom=self.do_xcom_push)
 
-            if len(pod_list.items) == 1 and \
-                    self._try_numbers_do_not_match(context, pod_list.items[0]) and \
-                    self.reattach_on_restart:
-                self.log.info("found a running pod with labels %s but a different try_number"
-                              "Will attach to this pod and monitor instead of starting new one", labels)
-                final_state, _, result = self.create_new_pod_for_operator(labels, launcher)
-            elif len(pod_list.items) == 1:
-                self.log.info("found a running pod with labels %s."
-                              "Will monitor this pod instead of starting new one", labels)
-                final_state, result = self.monitor_launched_pod(launcher, pod_list[0])
+            if len(pod_list.items) == 1:
+                try_numbers_match = self._try_numbers_match(context, pod_list.items[0])
+                final_state, result = self.handle_pod_overlap(labels, try_numbers_match, launcher, pod_list)
             else:
                 final_state, _, result = self.create_new_pod_for_operator(labels, launcher)
             if final_state != State.SUCCESS:
@@ -296,14 +289,41 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
         except AirflowException as ex:
             raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
 
+    def handle_pod_overlap(self, labels, try_numbers_match, launcher, pod_list):
+        """
+        In cases where the Scheduler restarts while a KubernetsPodOperator task is running,
+        this function will either continue to monitor the existing pod or launch a new pod
+        based on the `reattach_on_restart` parameter.
+        :param labels: labels used to determine if a pod is repeated
+        :type labels: dict
+        :param try_numbers_match: do the try numbers match? Only needed for logging purposes
+        :type try_numbers_match: bool
+        :param launcher: PodLauncher
+        :param pod_list: list of pods found
+        """
+        if try_numbers_match:
+            log_line = "found a running pod with labels {} and the same try_number.".format(labels)
+        else:
+            log_line = "found a running pod with labels {} but a different try_number.".format(labels)
+
+        if self.reattach_on_restart:
+            log_line = log_line + " Will attach to this pod and monitor instead of starting new one"
+            self.log.info(log_line)
+            final_state, result = self.monitor_launched_pod(launcher, pod_list.items[0])
+        else:
+            log_line = log_line + "creating pod with labels {} and launcher {}".format(labels, launcher)
+            self.log.info(log_line)
+            final_state, _, result = self.create_new_pod_for_operator(labels, launcher)
+        return final_state, result
+
     @staticmethod
     def _get_pod_identifying_label_string(labels):
         filtered_labels = {label_id: label for label_id, label in labels.items() if label_id != 'try_number'}
         return ','.join([label_id + '=' + label for label_id, label in sorted(filtered_labels.items())])
 
     @staticmethod
-    def _try_numbers_do_not_match(context, pod):
-        return pod.metadata.labels['try_number'] != context['ti'].try_number
+    def _try_numbers_match(context, pod):
+        return pod.metadata.labels['try_number'] == context['ti'].try_number
 
     @staticmethod
     def _set_resources(resources):