You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/05/06 18:11:23 UTC

[GitHub] [airflow] dimberman commented on a change in pull request #6377: Monitor pods by labels instead of names

dimberman commented on a change in pull request #6377:
URL: https://github.com/apache/airflow/pull/6377#discussion_r420992498



##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -235,84 +259,46 @@ def execute(self, context):
                 client = kube_client.get_kube_client(cluster_context=self.cluster_context,
                                                      config_file=self.config_file)
 
-            if not (self.full_pod_spec or self.pod_template_file):
-                # Add Airflow Version to the label
-                # And a label to identify that pod is launched by KubernetesPodOperator
-                self.labels.update(
-                    {
-                        'airflow_version': airflow_version.replace('+', '-'),
-                        'kubernetes_pod_operator': 'True',
-                    }
-                )
-            pod = pod_generator.PodGenerator(
-                image=self.image,
-                namespace=self.namespace,
-                cmds=self.cmds,
-                args=self.arguments,
-                labels=self.labels,
-                name=self.name,
-                envs=self.env_vars,
-                extract_xcom=self.do_xcom_push,
-                image_pull_policy=self.image_pull_policy,
-                node_selectors=self.node_selectors,
-                annotations=self.annotations,
-                affinity=self.affinity,
-                image_pull_secrets=self.image_pull_secrets,
-                service_account_name=self.service_account_name,
-                hostnetwork=self.hostnetwork,
-                tolerations=self.tolerations,
-                configmaps=self.configmaps,
-                security_context=self.security_context,
-                dnspolicy=self.dnspolicy,
-                schedulername=self.schedulername,
-                init_containers=self.init_containers,
-                restart_policy='Never',
-                priority_class_name=self.priority_class_name,
-                pod_template_file=self.pod_template_file,
-                pod=self.full_pod_spec,
-            ).gen_pod()
+            # Add combination of labels to uniquely identify a running pod
+            labels = self.create_labels_for_pod(context)
 
-            pod = append_to_pod(
-                pod,
-                self.pod_runtime_info_envs +
-                self.ports +
-                self.resources +
-                self.secrets +
-                self.volumes +
-                self.volume_mounts
-            )
+            label_selector = self._get_pod_identifying_label_string(labels)
 
-            self.pod = pod
+            pod_list = client.list_namespaced_pod(self.namespace, label_selector=label_selector,
+                                                  include_uninitialized=True)
 
-            launcher = pod_launcher.PodLauncher(kube_client=client,
-                                                extract_xcom=self.do_xcom_push)
+            if len(pod_list.items) > 1:
+                raise AirflowException(
+                    'More than one pod running with labels: '
+                    '{label_selector}'.format(label_selector=label_selector))
 
-            try:
-                (final_state, result) = launcher.run_pod(
-                    pod,
-                    startup_timeout=self.startup_timeout_seconds,
-                    get_logs=self.get_logs)
-            except AirflowException:
-                if self.log_events_on_failure:
-                    for event in launcher.read_pod_events(pod).items:
-                        self.log.error("Pod Event: %s - %s", event.reason, event.message)
-                raise
-            finally:
-                if self.is_delete_operator_pod:
-                    launcher.delete_pod(pod)
+            launcher = pod_launcher.PodLauncher(kube_client=client, extract_xcom=self.do_xcom_push)
 
+            if len(pod_list.items) == 1 and self._try_numbers_do_not_match(context, pod_list[0]):
+                self.log.info("found a running pod with labels %s."
+                              "Will monitor instead of starting new one", labels)
+                launcher.delete_pod(pod_list[0])
+                final_state, _, result = self.create_new_pod_for_operator(labels, launcher)

Review comment:
       moved it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org