You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by di...@apache.org on 2020/06/23 19:38:19 UTC

[airflow] branch v1-10-test updated (33973ab -> dd7caa6)

This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a change to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    from 33973ab  Add extra options to fix SSL issue and be more flexible (#9409)
     new 2324175  Move KubernetesPodOperator into providers package
     new dd7caa6  Monitor pods by labels instead of names (#6377)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../contrib/operators/kubernetes_pod_operator.py   | 276 +------------------
 airflow/executors/kubernetes_executor.py           |  34 +--
 airflow/kubernetes/pod_generator.py                |  42 +++
 airflow/kubernetes/pod_launcher.py                 |  41 ++-
 .../cncf/kubernetes/operators/kubernetes_pod.py}   | 302 ++++++++++++++-------
 chart/charts/postgresql-6.3.12.tgz                 | Bin 0 -> 22754 bytes
 kubernetes_tests/test_kubernetes_pod_operator.py   | 182 +++++++++++--
 tests/executors/test_kubernetes_executor.py        |  17 +-
 8 files changed, 468 insertions(+), 426 deletions(-)
 copy airflow/{contrib/operators/kubernetes_pod_operator.py => providers/cncf/kubernetes/operators/kubernetes_pod.py} (57%)
 create mode 100644 chart/charts/postgresql-6.3.12.tgz


[airflow] 02/02: Monitor pods by labels instead of names (#6377)

Posted by di...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit dd7caa6721129aab42b0f46a58d758115fc731f3
Author: Daniel Imberman <da...@gmail.com>
AuthorDate: Sat May 16 14:13:58 2020 -0700

    Monitor pods by labels instead of names (#6377)
    
    * Monitor k8sPodOperator pods by labels
    
    To prevent situations where the scheduler starts a
    second k8sPodOperator pod after a restart, we now check
    for existing pods using kubernetes labels
    
    * Update airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
    
    Co-authored-by: Kaxil Naik <ka...@gmail.com>
    
    * Update airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
    
    Co-authored-by: Kaxil Naik <ka...@gmail.com>
    
    * add docs
    
    * Update airflow/kubernetes/pod_launcher.py
    
    Co-authored-by: Kaxil Naik <ka...@gmail.com>
    
    Co-authored-by: Daniel Imberman <da...@astronomer.io>
    Co-authored-by: Kaxil Naik <ka...@gmail.com>
    (cherry picked from commit 8985df0bfcb5f2b2cd69a21b9814021f9f8ce953)
---
 airflow/executors/kubernetes_executor.py           |  34 +--
 airflow/kubernetes/pod_generator.py                |  42 +++
 airflow/kubernetes/pod_launcher.py                 |  41 ++-
 .../cncf/kubernetes/operators/kubernetes_pod.py    | 302 ++++++++++++++-------
 kubernetes_tests/test_kubernetes_pod_operator.py   | 182 +++++++++++--
 tests/executors/test_kubernetes_executor.py        |  17 +-
 6 files changed, 456 insertions(+), 162 deletions(-)

diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index 8b5fdc1..2036f4f 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -33,6 +33,8 @@ from kubernetes.client.rest import ApiException
 from urllib3.exceptions import HTTPError, ReadTimeoutError
 
 from airflow.configuration import conf
+from airflow.kubernetes import pod_generator
+from airflow.kubernetes.pod_generator import MAX_POD_ID_LEN, PodGenerator
 from airflow.kubernetes.pod_launcher import PodLauncher
 from airflow.kubernetes.kube_client import get_kube_client
 from airflow.kubernetes.worker_configuration import WorkerConfiguration
@@ -45,7 +47,6 @@ from airflow import settings
 from airflow.exceptions import AirflowConfigException, AirflowException
 from airflow.utils.log.logging_mixin import LoggingMixin
 
-MAX_POD_ID_LEN = 253
 MAX_LABEL_LEN = 63
 
 
@@ -402,8 +403,8 @@ class AirflowKubernetesScheduler(LoggingMixin):
             namespace=self.namespace,
             worker_uuid=self.worker_uuid,
             pod_id=self._create_pod_id(dag_id, task_id),
-            dag_id=self._make_safe_label_value(dag_id),
-            task_id=self._make_safe_label_value(task_id),
+            dag_id=pod_generator.make_safe_label_value(dag_id),
+            task_id=pod_generator.make_safe_label_value(task_id),
             try_number=try_number,
             execution_date=self._datetime_to_label_safe_datestring(execution_date),
             airflow_command=command
@@ -495,25 +496,6 @@ class AirflowKubernetesScheduler(LoggingMixin):
         return safe_pod_id
 
     @staticmethod
-    def _make_safe_label_value(string):
-        """
-        Valid label values must be 63 characters or less and must be empty or begin and
-        end with an alphanumeric character ([a-z0-9A-Z]) with dashes (-), underscores (_),
-        dots (.), and alphanumerics between.
-
-        If the label value is then greater than 63 chars once made safe, or differs in any
-        way from the original value sent to this function, then we need to truncate to
-        53chars, and append it with a unique hash.
-        """
-        safe_label = re.sub(r'^[^a-z0-9A-Z]*|[^a-zA-Z0-9_\-\.]|[^a-z0-9A-Z]*$', '', string)
-
-        if len(safe_label) > MAX_LABEL_LEN or string != safe_label:
-            safe_hash = hashlib.md5(string.encode()).hexdigest()[:9]
-            safe_label = safe_label[:MAX_LABEL_LEN - len(safe_hash) - 1] + "-" + safe_hash
-
-        return safe_label
-
-    @staticmethod
     def _create_pod_id(dag_id, task_id):
         safe_dag_id = AirflowKubernetesScheduler._strip_unsafe_kubernetes_special_chars(
             dag_id)
@@ -599,8 +581,8 @@ class AirflowKubernetesScheduler(LoggingMixin):
             )
             for task in tasks:
                 if (
-                    self._make_safe_label_value(task.dag_id) == dag_id and
-                    self._make_safe_label_value(task.task_id) == task_id and
+                    pod_generator.make_safe_label_value(task.dag_id) == dag_id and
+                    pod_generator.make_safe_label_value(task.task_id) == task_id and
                     task.execution_date == ex_time
                 ):
                     self.log.info(
@@ -683,8 +665,8 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
             # pylint: disable=protected-access
             dict_string = (
                 "dag_id={},task_id={},execution_date={},airflow-worker={}".format(
-                    AirflowKubernetesScheduler._make_safe_label_value(task.dag_id),
-                    AirflowKubernetesScheduler._make_safe_label_value(task.task_id),
+                    pod_generator.make_safe_label_value(task.dag_id),
+                    pod_generator.make_safe_label_value(task.task_id),
                     AirflowKubernetesScheduler._datetime_to_label_safe_datestring(
                         task.execution_date
                     ),
diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index 2a5a0df..e92918e 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -22,10 +22,20 @@ is supported and no serialization need be written.
 """
 
 import copy
+import hashlib
+import inspect
+import os
+import re
+import uuid
+from functools import reduce
+from typing import Dict, List, Optional, Union
+
 import kubernetes.client.models as k8s
 from airflow.executors import Executors
 import uuid
 
+MAX_LABEL_LEN = 63
+
 
 class PodDefaults:
     """
@@ -55,6 +65,25 @@ class PodDefaults:
     )
 
 
+def make_safe_label_value(string):
+    """
+    Valid label values must be 63 characters or less and must be empty or begin and
+    end with an alphanumeric character ([a-z0-9A-Z]) with dashes (-), underscores (_),
+    dots (.), and alphanumerics between.
+
+    If the label value is greater than 63 chars once made safe, or differs in any
+    way from the original value sent to this function, then we need to truncate to
+    53 chars, and append it with a unique hash.
+    """
+    safe_label = re.sub(r"^[^a-z0-9A-Z]*|[^a-zA-Z0-9_\-\.]|[^a-z0-9A-Z]*$", "", string)
+
+    if len(safe_label) > MAX_LABEL_LEN or string != safe_label:
+        safe_hash = hashlib.md5(string.encode()).hexdigest()[:9]
+        safe_label = safe_label[:MAX_LABEL_LEN - len(safe_hash) - 1] + "-" + safe_hash
+
+    return safe_label
+
+
 class PodGenerator:
     """
     Contains Kubernetes Airflow Worker configuration logic
@@ -201,9 +230,22 @@ class PodGenerator:
         if self.extract_xcom:
             result = self.add_sidecar(result)
 
+        result.metadata.name = self.make_unique_pod_id(result.metadata.name)
         return result
 
     @staticmethod
+    def make_unique_pod_id(dag_id):
+        """
+        Kubernetes pod names must be <= 253 chars and must pass the following regex for
+        validation
+        ``^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$``
+        :param dag_id: a dag_id with only alphanumeric characters
+        :return: ``str`` valid Pod name of appropriate length
+        """
+        if not dag_id:
+            return None
+
+    @staticmethod
     def add_sidecar(pod):
         pod_cp = copy.deepcopy(pod)
 
diff --git a/airflow/kubernetes/pod_launcher.py b/airflow/kubernetes/pod_launcher.py
index 47d8ed5..62360c5 100644
--- a/airflow/kubernetes/pod_launcher.py
+++ b/airflow/kubernetes/pod_launcher.py
@@ -89,13 +89,16 @@ class PodLauncher(LoggingMixin):
             if e.status != 404:
                 raise
 
-    def run_pod(self, pod, startup_timeout=120, get_logs=True):
+    def start_pod(
+            self,
+            pod,
+            startup_timeout):
         """
         Launches the pod synchronously and waits for completion.
-        Args:
-            pod (Pod):
-            startup_timeout (int): Timeout for startup of the pod (if pod is pending for
-             too long, considers task a failure
+
+        :param pod:
+        :param startup_timeout: Timeout for startup of the pod (if pod is pending for too long, fails task)
+        :return:
         """
         resp = self.run_pod_async(pod)
         curr_time = dt.now()
@@ -107,9 +110,14 @@ class PodLauncher(LoggingMixin):
                 time.sleep(1)
             self.log.debug('Pod not yet started')
 
-        return self._monitor_pod(pod, get_logs)
 
-    def _monitor_pod(self, pod, get_logs):
+    def monitor_pod(self, pod, get_logs):
+        """
+        :param pod: pod spec that will be monitored
+        :type pod : V1Pod
+        :param get_logs: whether to read the logs locally
+        :return:  Tuple[State, Optional[str]]
+        """
 
         if get_logs:
             logs = self.read_pod_logs(pod)
@@ -180,7 +188,24 @@ class PodLauncher(LoggingMixin):
         wait=tenacity.wait_exponential(),
         reraise=True
     )
-    def read_pod(self, pod):
+    def read_pod_events(self, pod):
+        """Reads events from the POD"""
+        try:
+            return self._client.list_namespaced_event(
+                namespace=pod.metadata.namespace,
+                field_selector="involvedObject.name={}".format(pod.metadata.name)
+            )
+        except BaseHTTPError as e:
+            raise AirflowException(
+                'There was an error reading the kubernetes API: {}'.format(e)
+            )
+
+    @tenacity.retry(
+        stop=tenacity.stop_after_attempt(3),
+        wait=tenacity.wait_exponential(),
+        reraise=True
+    )
+    def read_pod(self, pod: V1Pod):
         """Read POD information"""
         try:
             return self._client.read_namespaced_pod(pod.metadata.name, pod.metadata.namespace)
diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index b89a37f..6742309 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -80,6 +80,12 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
     :param cluster_context: context that points to kubernetes cluster.
         Ignored when in_cluster is True. If None, current-context is used.
     :type cluster_context: str
+    :param reattach_on_restart: if the scheduler dies while the pod is running, reattach and monitor
+    :type reattach_on_restart: bool
+    :param labels: labels to apply to the Pod.
+    :type labels: dict
+    :param startup_timeout_seconds: timeout in seconds to startup the pod.
+    :type startup_timeout_seconds: int
     :param get_logs: get the stdout of the container as logs of the tasks.
     :type get_logs: bool
     :param annotations: non-identifying metadata you can attach to the Pod.
@@ -126,90 +132,11 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
     """
     template_fields = ('cmds', 'arguments', 'env_vars', 'config_file')
 
-    def execute(self, context):
-        try:
-            client = kube_client.get_kube_client(in_cluster=self.in_cluster,
-                                                 cluster_context=self.cluster_context,
-                                                 config_file=self.config_file)
-            # Add Airflow Version to the label
-            # And a label to identify that pod is launched by KubernetesPodOperator
-            self.labels.update(
-                {
-                    'airflow_version': airflow_version.replace('+', '-'),
-                    'kubernetes_pod_operator': 'True',
-                }
-            )
-
-            pod = pod_generator.PodGenerator(
-                image=self.image,
-                namespace=self.namespace,
-                cmds=self.cmds,
-                args=self.arguments,
-                labels=self.labels,
-                name=self.name,
-                envs=self.env_vars,
-                extract_xcom=self.do_xcom_push,
-                image_pull_policy=self.image_pull_policy,
-                node_selectors=self.node_selectors,
-                priority_class_name=self.priority_class_name,
-                annotations=self.annotations,
-                affinity=self.affinity,
-                init_containers=self.init_containers,
-                image_pull_secrets=self.image_pull_secrets,
-                service_account_name=self.service_account_name,
-                hostnetwork=self.hostnetwork,
-                tolerations=self.tolerations,
-                configmaps=self.configmaps,
-                security_context=self.security_context,
-                dnspolicy=self.dnspolicy,
-                pod=self.full_pod_spec,
-            ).gen_pod()
-
-            pod = append_to_pod(
-                pod,
-                self.pod_runtime_info_envs +
-                self.ports +
-                self.resources +
-                self.secrets +
-                self.volumes +
-                self.volume_mounts
-            )
-
-            self.pod = pod
-
-            launcher = pod_launcher.PodLauncher(kube_client=client,
-                                                extract_xcom=self.do_xcom_push)
-
-            try:
-                (final_state, result) = launcher.run_pod(
-                    pod,
-                    startup_timeout=self.startup_timeout_seconds,
-                    get_logs=self.get_logs)
-            finally:
-                if self.is_delete_operator_pod:
-                    launcher.delete_pod(pod)
-
-            if final_state != State.SUCCESS:
-                raise AirflowException(
-                    'Pod returned a failure: {state}'.format(state=final_state)
-                )
-
-            return result
-        except AirflowException as ex:
-            raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
-
-    def _set_resources(self, resources):
-        return [Resources(**resources) if resources else Resources()]
-
-    def _set_name(self, name):
-        validate_key(name, max_length=220)
-        return re.sub(r'[^a-z0-9.-]+', '-', name.lower())
-
     @apply_defaults
     def __init__(self,  # pylint: disable=too-many-arguments,too-many-locals
-                 namespace,
-                 image,
-                 name,
+                 namespace=None,
+                 image=None,
+                 name=None,
                  cmds=None,
                  arguments=None,
                  ports=None,
@@ -220,15 +147,14 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
                  in_cluster=None,
                  cluster_context=None,
                  labels=None,
+                 reattach_on_restart=True,
                  startup_timeout_seconds=120,
                  get_logs=True,
                  image_pull_policy='IfNotPresent',
                  annotations=None,
                  resources=None,
                  affinity=None,
-                 init_containers=None,
                  config_file=None,
-                 do_xcom_push=False,
                  node_selectors=None,
                  image_pull_secrets=None,
                  service_account_name='default',
@@ -239,18 +165,19 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
                  security_context=None,
                  pod_runtime_info_envs=None,
                  dnspolicy=None,
+                 schedulername=None,
                  full_pod_spec=None,
+                 init_containers=None,
+                 log_events_on_failure=False,
+                 do_xcom_push=False,
+                 pod_template_file=None,
                  priority_class_name=None,
                  *args,
                  **kwargs):
-        # https://github.com/apache/airflow/blob/2d0eff4ee4fafcf8c7978ac287a8fb968e56605f/UPDATING.md#unification-of-do_xcom_push-flag
         if kwargs.get('xcom_push') is not None:
-            kwargs['do_xcom_push'] = kwargs.pop('xcom_push')
-            warnings.warn(
-                "`xcom_push` will be deprecated. Use `do_xcom_push` instead.",
-                DeprecationWarning, stacklevel=2
-            )
-        super(KubernetesPodOperator, self).__init__(*args, resources=None, **kwargs)
+            raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead")
+        super().__init__(*args, resources=None, **kwargs)
+
         self.pod = None
         self.do_xcom_push = do_xcom_push
         self.image = image
@@ -259,16 +186,14 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
         self.arguments = arguments or []
         self.labels = labels or {}
         self.startup_timeout_seconds = startup_timeout_seconds
-        self.name = self._set_name(name)
         self.env_vars = env_vars or {}
         self.ports = ports or []
-        self.init_containers = init_containers or []
-        self.priority_class_name = priority_class_name
         self.volume_mounts = volume_mounts or []
         self.volumes = volumes or []
         self.secrets = secrets or []
         self.in_cluster = in_cluster
         self.cluster_context = cluster_context
+        self.reattach_on_restart = reattach_on_restart
         self.get_logs = get_logs
         self.image_pull_policy = image_pull_policy
         self.node_selectors = node_selectors or {}
@@ -285,4 +210,193 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
         self.security_context = security_context or {}
         self.pod_runtime_info_envs = pod_runtime_info_envs or []
         self.dnspolicy = dnspolicy
+        self.schedulername = schedulername
         self.full_pod_spec = full_pod_spec
+        self.init_containers = init_containers or []
+        self.log_events_on_failure = log_events_on_failure
+        self.priority_class_name = priority_class_name
+        self.pod_template_file = pod_template_file
+        self.name = self._set_name(name)
+
+    @staticmethod
+    def create_labels_for_pod(context) -> dict:
+        """
+        Generate labels for the pod to track the pod in case of Operator crash
+
+        :param context: task context provided by airflow DAG
+        :return: dict
+        """
+        labels = {
+            'dag_id': context['dag'].dag_id,
+            'task_id': context['task'].task_id,
+            'execution_date': context['ts'],
+            'try_number': context['ti'].try_number,
+        }
+        # In the case of sub dags this is just useful
+        if context['dag'].is_subdag:
+            labels['parent_dag_id'] = context['dag'].parent_dag.dag_id
+        # Ensure that label is valid for Kube,
+        # and if not truncate/remove invalid chars and replace with short hash.
+        for label_id, label in labels.items():
+            safe_label = pod_generator.make_safe_label_value(str(label))
+            labels[label_id] = safe_label
+        return labels
+
+    def execute(self, context):
+        try:
+            if self.in_cluster is not None:
+                client = kube_client.get_kube_client(in_cluster=self.in_cluster,
+                                                     cluster_context=self.cluster_context,
+                                                     config_file=self.config_file)
+            else:
+                client = kube_client.get_kube_client(cluster_context=self.cluster_context,
+                                                     config_file=self.config_file)
+
+            # Add combination of labels to uniquely identify a running pod
+            labels = self.create_labels_for_pod(context)
+
+            label_selector = self._get_pod_identifying_label_string(labels)
+
+            pod_list = client.list_namespaced_pod(self.namespace, label_selector=label_selector)
+
+            if len(pod_list.items) > 1:
+                raise AirflowException(
+                    'More than one pod running with labels: '
+                    '{label_selector}'.format(label_selector=label_selector))
+
+            launcher = pod_launcher.PodLauncher(kube_client=client, extract_xcom=self.do_xcom_push)
+
+            if len(pod_list.items) == 1 and \
+                    self._try_numbers_do_not_match(context, pod_list.items[0]) and \
+                    self.reattach_on_restart:
+                self.log.info("found a running pod with labels %s but a different try_number"
+                              "Will attach to this pod and monitor instead of starting new one", labels)
+                final_state, _, result = self.create_new_pod_for_operator(labels, launcher)
+            elif len(pod_list.items) == 1:
+                self.log.info("found a running pod with labels %s."
+                              "Will monitor this pod instead of starting new one", labels)
+                final_state, result = self.monitor_launched_pod(launcher, pod_list[0])
+            else:
+                final_state, _, result = self.create_new_pod_for_operator(labels, launcher)
+            if final_state != State.SUCCESS:
+                raise AirflowException(
+                    'Pod returned a failure: {state}'.format(state=final_state))
+            return result
+        except AirflowException as ex:
+            raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
+
+    @staticmethod
+    def _get_pod_identifying_label_string(labels):
+        filtered_labels = {label_id: label for label_id, label in labels.items() if label_id != 'try_number'}
+        return ','.join([label_id + '=' + label for label_id, label in sorted(filtered_labels.items())])
+
+    @staticmethod
+    def _try_numbers_do_not_match(context, pod):
+        return pod.metadata.labels['try_number'] != context['ti'].try_number
+
+    @staticmethod
+    def _set_resources(resources):
+        if not resources:
+            return []
+        return [Resources(**resources)]
+
+    def _set_name(self, name):
+        if self.pod_template_file or self.full_pod_spec:
+            return None
+        validate_key(name, max_length=220)
+        return re.sub(r'[^a-z0-9.-]+', '-', name.lower())
+
+    def create_new_pod_for_operator(self, labels, launcher):
+        """
+        Creates a new pod and monitors for duration of task
+
+        @param labels: labels used to track pod
+        @param launcher: pod launcher that will manage launching and monitoring pods
+        @return:
+        """
+        if not (self.full_pod_spec or self.pod_template_file):
+            # Add Airflow Version to the label
+            # And a label to identify that pod is launched by KubernetesPodOperator
+            self.labels.update(
+                {
+                    'airflow_version': airflow_version.replace('+', '-'),
+                    'kubernetes_pod_operator': 'True',
+                }
+            )
+            self.labels.update(labels)
+        pod = pod_generator.PodGenerator(
+            image=self.image,
+            namespace=self.namespace,
+            cmds=self.cmds,
+            args=self.arguments,
+            labels=self.labels,
+            name=self.name,
+            envs=self.env_vars,
+            extract_xcom=self.do_xcom_push,
+            image_pull_policy=self.image_pull_policy,
+            node_selectors=self.node_selectors,
+            annotations=self.annotations,
+            affinity=self.affinity,
+            image_pull_secrets=self.image_pull_secrets,
+            service_account_name=self.service_account_name,
+            hostnetwork=self.hostnetwork,
+            tolerations=self.tolerations,
+            configmaps=self.configmaps,
+            security_context=self.security_context,
+            dnspolicy=self.dnspolicy,
+            schedulername=self.schedulername,
+            init_containers=self.init_containers,
+            restart_policy='Never',
+            priority_class_name=self.priority_class_name,
+            pod=self.full_pod_spec,
+        ).gen_pod()
+
+        # noinspection PyTypeChecker
+        pod = append_to_pod(
+            pod,
+            self.pod_runtime_info_envs +  # type: ignore
+            self.ports +  # type: ignore
+            self.resources +  # type: ignore
+            self.secrets +  # type: ignore
+            self.volumes +  # type: ignore
+            self.volume_mounts  # type: ignore
+        )
+
+        self.pod = pod
+
+        try:
+            launcher.start_pod(
+                pod,
+                startup_timeout=self.startup_timeout_seconds)
+            final_state, result = launcher.monitor_pod(pod=pod, get_logs=self.get_logs)
+        except AirflowException:
+            if self.log_events_on_failure:
+                for event in launcher.read_pod_events(pod).items:
+                    self.log.error("Pod Event: %s - %s", event.reason, event.message)
+            raise
+        finally:
+            if self.is_delete_operator_pod:
+                launcher.delete_pod(pod)
+        return final_state, pod, result
+
+    def monitor_launched_pod(self, launcher, pod):
+        """
+        Montitors a pod to completion that was created by a previous KubernetesPodOperator
+
+        @param launcher: pod launcher that will manage launching and monitoring pods
+        :param pod: podspec used to find pod using k8s API
+        :return:
+        """
+        try:
+            (final_state, result) = launcher.monitor_pod(pod, get_logs=self.get_logs)
+        finally:
+            if self.is_delete_operator_pod:
+                launcher.delete_pod(pod)
+        if final_state != State.SUCCESS:
+            if self.log_events_on_failure:
+                for event in launcher.read_pod_events(pod).items:
+                    self.log.error("Pod Event: %s - %s", event.reason, event.message)
+            raise AirflowException(
+                'Pod returned a failure: {state}'.format(state=final_state)
+            )
+        return final_state, result
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py
index 814318f..e737ce5 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -26,6 +26,7 @@ from tests.compat import mock, patch
 
 import kubernetes.client.models as k8s
 import pendulum
+import pytest
 from kubernetes.client.api_client import ApiClient
 from kubernetes.client.rest import ApiException
 
@@ -74,11 +75,10 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
                 'labels': {
                     'foo': 'bar', 'kubernetes_pod_operator': 'True',
                     'airflow_version': airflow_version.replace('+', '-'),
-                    # 'execution_date': '2016-01-01T0100000100-a2f50a31f',
-                    # 'dag_id': 'dag',
-                    # 'task_id': 'task',
-                    # 'try_number': '1'
-                },
+                    'execution_date': '2016-01-01T0100000100-a2f50a31f',
+                    'dag_id': 'dag',
+                    'task_id': 'task',
+                    'try_number': '1'},
             },
             'spec': {
                 'affinity': {},
@@ -109,10 +109,23 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             }
         }
 
-    def tearDown(self):
+    def tearDown(self) -> None:
         client = kube_client.get_kube_client(in_cluster=False)
         client.delete_collection_namespaced_pod(namespace="default")
 
+    def create_context(self, task):
+        dag = DAG(dag_id="dag")
+        tzinfo = pendulum.timezone("Europe/Amsterdam")
+        execution_date = timezone.datetime(2016, 1, 1, 1, 0, 0, tzinfo=tzinfo)
+        task_instance = TaskInstance(task=task,
+                                     execution_date=execution_date)
+        return {
+            "dag": dag,
+            "ts": execution_date.isoformat(),
+            "task": task,
+            "ti": task_instance,
+        }
+
     def test_do_xcom_push_defaults_false(self):
         new_config_path = '/tmp/kube_config'
         old_config_path = os.path.expanduser('~/.kube/config')
@@ -149,11 +162,98 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             do_xcom_push=False,
             config_file=new_config_path,
         )
-        context = create_context(k)
+        context = self.create_context(k)
         k.execute(context)
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
         self.assertEqual(self.expected_pod, actual_pod)
 
+    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod")
+    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
+    @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
+    def test_config_path(self, client_mock, monitor_mock, start_mock):  # pylint: disable=unused-argument
+        from airflow.utils.state import State
+
+        file_path = "/tmp/fake_file"
+        k = KubernetesPodOperator(
+            namespace='default',
+            image="ubuntu:16.04",
+            cmds=["bash", "-cx"],
+            arguments=["echo 10"],
+            labels={"foo": "bar"},
+            name="test",
+            task_id="task",
+            in_cluster=False,
+            do_xcom_push=False,
+            config_file=file_path,
+            cluster_context='default',
+        )
+        monitor_mock.return_value = (State.SUCCESS, None)
+        client_mock.list_namespaced_pod.return_value = []
+        context = self.create_context(k)
+        k.execute(context=context)
+        client_mock.assert_called_once_with(
+            in_cluster=False,
+            cluster_context='default',
+            config_file=file_path,
+        )
+
+    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod")
+    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
+    @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
+    def test_image_pull_secrets_correctly_set(self, mock_client, monitor_mock, start_mock):
+        from airflow.utils.state import State
+
+        fake_pull_secrets = "fakeSecret"
+        k = KubernetesPodOperator(
+            namespace='default',
+            image="ubuntu:16.04",
+            cmds=["bash", "-cx"],
+            arguments=["echo 10"],
+            labels={"foo": "bar"},
+            name="test",
+            task_id="task",
+            in_cluster=False,
+            do_xcom_push=False,
+            image_pull_secrets=fake_pull_secrets,
+            cluster_context='default',
+        )
+        monitor_mock.return_value = (State.SUCCESS, None)
+        context = self.create_context(k)
+        k.execute(context=context)
+        self.assertEqual(
+            start_mock.call_args[0][0].spec.image_pull_secrets,
+            [k8s.V1LocalObjectReference(name=fake_pull_secrets)]
+        )
+
+    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod")
+    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
+    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.delete_pod")
+    @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
+    def test_pod_delete_even_on_launcher_error(
+            self,
+            mock_client,
+            delete_pod_mock,
+            monitor_pod_mock,
+            start_pod_mock):  # pylint: disable=unused-argument
+        k = KubernetesPodOperator(
+            namespace='default',
+            image="ubuntu:16.04",
+            cmds=["bash", "-cx"],
+            arguments=["echo 10"],
+            labels={"foo": "bar"},
+            name="test",
+            task_id="task",
+            in_cluster=False,
+            do_xcom_push=False,
+            cluster_context='default',
+            is_delete_operator_pod=True,
+        )
+        monitor_pod_mock.side_effect = AirflowException('fake failure')
+        with self.assertRaises(AirflowException):
+            context = self.create_context(k)
+            k.execute(context=context)
+        assert delete_pod_mock.called
+
     def test_working_pod(self):
         k = KubernetesPodOperator(
             namespace='default',
@@ -185,7 +285,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             do_xcom_push=False,
             is_delete_operator_pod=True,
         )
-        context = create_context(k)
+        context = self.create_context(k)
         k.execute(context)
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
         self.assertEqual(self.expected_pod['spec'], actual_pod['spec'])
@@ -204,7 +304,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             do_xcom_push=False,
             hostnetwork=True,
         )
-        context = create_context(k)
+        context = self.create_context(k)
         k.execute(context)
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
         self.expected_pod['spec']['hostNetwork'] = True
@@ -226,7 +326,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             hostnetwork=True,
             dnspolicy=dns_policy
         )
-        context = create_context(k)
+        context = self.create_context(k)
         k.execute(context)
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
         self.expected_pod['spec']['hostNetwork'] = True
@@ -234,6 +334,28 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
         self.assertEqual(self.expected_pod['spec'], actual_pod['spec'])
         self.assertEqual(self.expected_pod['metadata']['labels'], actual_pod['metadata']['labels'])
 
+    def test_pod_schedulername(self):
+        scheduler_name = "default-scheduler"
+        k = KubernetesPodOperator(
+            namespace="default",
+            image="ubuntu:16.04",
+            cmds=["bash", "-cx"],
+            arguments=["echo 10"],
+            labels={"foo": "bar"},
+            name="test",
+            task_id="task",
+            in_cluster=False,
+            do_xcom_push=False,
+            schedulername=scheduler_name
+        )
+        context = self.create_context(k)
+        k.execute(context)
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        self.expected_pod['spec']['schedulerName'] = scheduler_name
+        self.assertEqual(self.expected_pod, actual_pod)
+        self.assertEqual(self.expected_pod['spec'], actual_pod['spec'])
+        self.assertEqual(self.expected_pod['metadata']['labels'], actual_pod['metadata']['labels'])
+
     def test_pod_node_selectors(self):
         node_selectors = {
             'beta.kubernetes.io/os': 'linux'
@@ -275,7 +397,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             do_xcom_push=False,
             resources=resources,
         )
-        context = create_context(k)
+        context = self.create_context(k)
         k.execute(context)
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
         self.expected_pod['spec']['containers'][0]['resources'] = {
@@ -342,7 +464,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             do_xcom_push=False,
             ports=[port],
         )
-        context = create_context(k)
+        context = self.create_context(k)
         k.execute(context=context)
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
         self.expected_pod['spec']['containers'][0]['ports'] = [{
@@ -564,9 +686,10 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
         self.expected_pod['spec']['containers'].append(container)
         self.assertEqual(self.expected_pod, actual_pod)
 
-    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
-    @patch("airflow.kubernetes.kube_client.get_kube_client")
-    def test_envs_from_configmaps(self, mock_client, mock_run):
+    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod")
+    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
+    @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
+    def test_envs_from_configmaps(self, mock_client, mock_monitor, mock_start):
         # GIVEN
         from airflow.utils.state import State
 
@@ -585,19 +708,20 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             configmaps=[configmap],
         )
         # THEN
-        mock_run.return_value = (State.SUCCESS, None)
-        context = create_context(k)
+        mock_monitor.return_value = (State.SUCCESS, None)
+        context = self.create_context(k)
         k.execute(context)
         self.assertEqual(
-            mock_run.call_args[0][0].spec.containers[0].env_from,
+            mock_start.call_args[0][0].spec.containers[0].env_from,
             [k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(
                 name=configmap
             ))]
         )
 
-    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
-    @patch("airflow.kubernetes.kube_client.get_kube_client")
-    def test_envs_from_secrets(self, mock_client, mock_run):
+    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod")
+    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
+    @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
+    def test_envs_from_secrets(self, mock_client, monitor_mock, start_mock):
         # GIVEN
         from airflow.utils.state import State
         secret_ref = 'secret_name'
@@ -616,11 +740,11 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             do_xcom_push=False,
         )
         # THEN
-        mock_run.return_value = (State.SUCCESS, None)
-        context = create_context(k)
+        monitor_mock.return_value = (State.SUCCESS, None)
+        context = self.create_context(k)
         k.execute(context)
         self.assertEqual(
-            mock_run.call_args[0][0].spec.containers[0].env_from,
+            start_mock.call_args[0][0].spec.containers[0].env_from,
             [k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(
                 name=secret_ref
             ))]
@@ -704,12 +828,14 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
         }]
         self.assertEqual(self.expected_pod, actual_pod)
 
-    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
+    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod")
+    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
     @patch("airflow.kubernetes.kube_client.get_kube_client")
     def test_pod_priority_class_name(
             self,
             mock_client,
-            run_mock):  # pylint: disable=unused-argument
+            monitor_mock,
+            start_mock):  # pylint: disable=unused-argument
         """Test ability to assign priorityClassName to pod
 
         """
@@ -729,8 +855,8 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             priority_class_name=priority_class_name,
         )
 
-        run_mock.return_value = (State.SUCCESS, None)
-        context = create_context(k)
+        monitor_mock.return_value = (State.SUCCESS, None)
+        context = self.create_context(k)
         k.execute(context)
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
         self.expected_pod['spec']['priorityClassName'] = priority_class_name
diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index 77299f6..4bb6fcf 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -33,6 +33,9 @@ try:
     from airflow.configuration import conf  # noqa: F401
     from airflow.executors.kubernetes_executor import AirflowKubernetesScheduler
     from airflow.executors.kubernetes_executor import KubernetesExecutor
+    from airflow.executors.kubernetes_executor import KubeConfig
+    from airflow.kubernetes import pod_generator
+    from airflow.kubernetes.pod_generator import PodGenerator
     from airflow.utils.state import State
 except ImportError:
     AirflowKubernetesScheduler = None  # type: ignore
@@ -87,24 +90,26 @@ class TestAirflowKubernetesScheduler(unittest.TestCase):
                      'kubernetes python package is not installed')
     def test_create_pod_id(self):
         for dag_id, task_id in self._cases():
-            pod_name = AirflowKubernetesScheduler._create_pod_id(dag_id, task_id)
+            pod_name = PodGenerator.make_unique_pod_id(
+                AirflowKubernetesScheduler._create_pod_id(dag_id, task_id)
+            )
             self.assertTrue(self._is_valid_pod_id(pod_name))
 
     def test_make_safe_label_value(self):
         for dag_id, task_id in self._cases():
-            safe_dag_id = AirflowKubernetesScheduler._make_safe_label_value(dag_id)
+            safe_dag_id = pod_generator.make_safe_label_value(dag_id)
             self.assertTrue(self._is_safe_label_value(safe_dag_id))
-            safe_task_id = AirflowKubernetesScheduler._make_safe_label_value(task_id)
+            safe_task_id = pod_generator.make_safe_label_value(task_id)
             self.assertTrue(self._is_safe_label_value(safe_task_id))
             id = "my_dag_id"
             self.assertEqual(
-                id,
-                AirflowKubernetesScheduler._make_safe_label_value(id)
+                dag_id,
+                pod_generator.make_safe_label_value(dag_id)
             )
             id = "my_dag_id_" + "a" * 64
             self.assertEqual(
                 "my_dag_id_" + "a" * 43 + "-0ce114c45",
-                AirflowKubernetesScheduler._make_safe_label_value(id)
+                pod_generator.make_safe_label_value(dag_id)
             )
 
     @unittest.skipIf(AirflowKubernetesScheduler is None,


[airflow] 01/02: Move KubernetesPodOperator into providers package

Posted by di...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 2324175fab7aff0d35651825e48eb7d651c8a845
Author: Daniel Imberman <da...@astronomer.io>
AuthorDate: Tue Jun 23 11:49:51 2020 -0700

    Move KubernetesPodOperator into providers package
    
    This commit will make merging PRs simpler while also
    maintaining existing paths.
---
 .../contrib/operators/kubernetes_pod_operator.py   | 276 +--------------------
 .../cncf/kubernetes/operators/kubernetes_pod.py}   |   0
 chart/charts/postgresql-6.3.12.tgz                 | Bin 0 -> 22754 bytes
 3 files changed, 12 insertions(+), 264 deletions(-)

diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index b89a37f..382f965 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -17,272 +17,20 @@
 """Executes task in a Kubernetes POD"""
 import warnings
 
-import re
+from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator as K8sPodOp
 
-from airflow.exceptions import AirflowException
-from airflow.kubernetes import kube_client, pod_generator, pod_launcher
-from airflow.kubernetes.k8s_model import append_to_pod
-from airflow.kubernetes.pod import Resources
-from airflow.models import BaseOperator
-from airflow.utils.decorators import apply_defaults
-from airflow.utils.helpers import validate_key
-from airflow.utils.state import State
-from airflow.version import version as airflow_version
 
-
-class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-attributes
+class KubernetesPodOperator(K8sPodOp):
     """
-    Execute a task in a Kubernetes Pod
-
-    .. note::
-        If you use `Google Kubernetes Engine <https://cloud.google.com/kubernetes-engine/>`__, use
-        :class:`~airflow.gcp.operators.kubernetes_engine.GKEPodOperator`, which
-        simplifies the authorization process.
-
-    :param image: Docker image you wish to launch. Defaults to hub.docker.com,
-        but fully qualified URLS will point to custom repositories.
-    :type image: str
-    :param name: name of the pod in which the task will run, will be used (plus a random
-        suffix) to generate a pod id (DNS-1123 subdomain, containing only [a-z0-9.-]).
-    :type name: str
-    :param cmds: entrypoint of the container. (templated)
-        The docker images's entrypoint is used if this is not provided.
-    :type cmds: list[str]
-    :param arguments: arguments of the entrypoint. (templated)
-        The docker image's CMD is used if this is not provided.
-    :type arguments: list[str]
-    :param image_pull_policy: Specify a policy to cache or always pull an image.
-    :type image_pull_policy: str
-    :param image_pull_secrets: Any image pull secrets to be given to the pod.
-                               If more than one secret is required, provide a
-                               comma separated list: secret_a,secret_b
-    :type image_pull_secrets: str
-    :param ports: ports for launched pod.
-    :type ports: list[airflow.kubernetes.pod.Port]
-    :param volume_mounts: volumeMounts for launched pod.
-    :type volume_mounts: list[airflow.kubernetes.volume_mount.VolumeMount]
-    :param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes.
-    :type volumes: list[airflow.kubernetes.volume.Volume]
-    :param labels: labels to apply to the Pod.
-    :type labels: dict
-    :param startup_timeout_seconds: timeout in seconds to startup the pod.
-    :type startup_timeout_seconds: int
-    :param name: name of the pod in which the task will run, will be used to
-        generate a pod id (DNS-1123 subdomain, containing only [a-z0-9.-]).
-    :type name: str
-    :param env_vars: Environment variables initialized in the container. (templated)
-    :type env_vars: dict
-    :param secrets: Kubernetes secrets to inject in the container.
-        They can be exposed as environment vars or files in a volume.
-    :type secrets: list[airflow.kubernetes.secret.Secret]
-    :param in_cluster: run kubernetes client with in_cluster configuration.
-    :type in_cluster: bool
-    :param cluster_context: context that points to kubernetes cluster.
-        Ignored when in_cluster is True. If None, current-context is used.
-    :type cluster_context: str
-    :param get_logs: get the stdout of the container as logs of the tasks.
-    :type get_logs: bool
-    :param annotations: non-identifying metadata you can attach to the Pod.
-                        Can be a large range of data, and can include characters
-                        that are not permitted by labels.
-    :type annotations: dict
-    :param resources: A dict containing resources requests and limits.
-        Possible keys are request_memory, request_cpu, limit_memory, limit_cpu,
-        and limit_gpu, which will be used to generate airflow.kubernetes.pod.Resources.
-        See also kubernetes.io/docs/concepts/configuration/manage-compute-resources-container
-    :type resources: dict
-    :param affinity: A dict containing a group of affinity scheduling rules.
-    :type affinity: dict
-    :param node_selectors: A dict containing a group of scheduling rules.
-    :type node_selectors: dict
-    :param config_file: The path to the Kubernetes config file. (templated)
-    :param config_file: The path to the Kubernetes config file. (templated)
-        If not specified, default value is ``~/.kube/config``
-    :type config_file: str
-    :param do_xcom_push: If do_xcom_push is True, the content of the file
-        /airflow/xcom/return.json in the container will also be pushed to an
-        XCom when the container completes.
-    :type do_xcom_push: bool
-    :param is_delete_operator_pod: What to do when the pod reaches its final
-        state, or the execution is interrupted.
-        If False (default): do nothing, If True: delete the pod
-    :type is_delete_operator_pod: bool
-    :param hostnetwork: If True enable host networking on the pod.
-    :type hostnetwork: bool
-    :param tolerations: A list of kubernetes tolerations.
-    :type tolerations: list tolerations
-    :param configmaps: A list of configmap names objects that we
-        want mount as env variables.
-    :type configmaps: list[str]
-    :param pod_runtime_info_envs: environment variables about
-                                  pod runtime information (ip, namespace, nodeName, podName).
-    :type pod_runtime_info_envs: list[airflow.kubernetes.pod_runtime_info_env.PodRuntimeInfoEnv]
-    :param security_context: security options the pod should run with (PodSecurityContext).
-    :type security_context: dict
-    :param dnspolicy: dnspolicy for the pod.
-    :type dnspolicy: str
-    :param full_pod_spec: The complete podSpec
-    :type full_pod_spec: kubernetes.client.models.V1Pod
+    This class is deprecated.
+    Please use `airflow.providers.cncd.kubernetes.operators.kubernetes_pod.KubernetesPodOperator
     """
-    template_fields = ('cmds', 'arguments', 'env_vars', 'config_file')
-
-    def execute(self, context):
-        try:
-            client = kube_client.get_kube_client(in_cluster=self.in_cluster,
-                                                 cluster_context=self.cluster_context,
-                                                 config_file=self.config_file)
-            # Add Airflow Version to the label
-            # And a label to identify that pod is launched by KubernetesPodOperator
-            self.labels.update(
-                {
-                    'airflow_version': airflow_version.replace('+', '-'),
-                    'kubernetes_pod_operator': 'True',
-                }
-            )
-
-            pod = pod_generator.PodGenerator(
-                image=self.image,
-                namespace=self.namespace,
-                cmds=self.cmds,
-                args=self.arguments,
-                labels=self.labels,
-                name=self.name,
-                envs=self.env_vars,
-                extract_xcom=self.do_xcom_push,
-                image_pull_policy=self.image_pull_policy,
-                node_selectors=self.node_selectors,
-                priority_class_name=self.priority_class_name,
-                annotations=self.annotations,
-                affinity=self.affinity,
-                init_containers=self.init_containers,
-                image_pull_secrets=self.image_pull_secrets,
-                service_account_name=self.service_account_name,
-                hostnetwork=self.hostnetwork,
-                tolerations=self.tolerations,
-                configmaps=self.configmaps,
-                security_context=self.security_context,
-                dnspolicy=self.dnspolicy,
-                pod=self.full_pod_spec,
-            ).gen_pod()
-
-            pod = append_to_pod(
-                pod,
-                self.pod_runtime_info_envs +
-                self.ports +
-                self.resources +
-                self.secrets +
-                self.volumes +
-                self.volume_mounts
-            )
-
-            self.pod = pod
-
-            launcher = pod_launcher.PodLauncher(kube_client=client,
-                                                extract_xcom=self.do_xcom_push)
-
-            try:
-                (final_state, result) = launcher.run_pod(
-                    pod,
-                    startup_timeout=self.startup_timeout_seconds,
-                    get_logs=self.get_logs)
-            finally:
-                if self.is_delete_operator_pod:
-                    launcher.delete_pod(pod)
-
-            if final_state != State.SUCCESS:
-                raise AirflowException(
-                    'Pod returned a failure: {state}'.format(state=final_state)
-                )
-
-            return result
-        except AirflowException as ex:
-            raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
-
-    def _set_resources(self, resources):
-        return [Resources(**resources) if resources else Resources()]
-
-    def _set_name(self, name):
-        validate_key(name, max_length=220)
-        return re.sub(r'[^a-z0-9.-]+', '-', name.lower())
 
-    @apply_defaults
-    def __init__(self,  # pylint: disable=too-many-arguments,too-many-locals
-                 namespace,
-                 image,
-                 name,
-                 cmds=None,
-                 arguments=None,
-                 ports=None,
-                 volume_mounts=None,
-                 volumes=None,
-                 env_vars=None,
-                 secrets=None,
-                 in_cluster=None,
-                 cluster_context=None,
-                 labels=None,
-                 startup_timeout_seconds=120,
-                 get_logs=True,
-                 image_pull_policy='IfNotPresent',
-                 annotations=None,
-                 resources=None,
-                 affinity=None,
-                 init_containers=None,
-                 config_file=None,
-                 do_xcom_push=False,
-                 node_selectors=None,
-                 image_pull_secrets=None,
-                 service_account_name='default',
-                 is_delete_operator_pod=False,
-                 hostnetwork=False,
-                 tolerations=None,
-                 configmaps=None,
-                 security_context=None,
-                 pod_runtime_info_envs=None,
-                 dnspolicy=None,
-                 full_pod_spec=None,
-                 priority_class_name=None,
-                 *args,
-                 **kwargs):
-        # https://github.com/apache/airflow/blob/2d0eff4ee4fafcf8c7978ac287a8fb968e56605f/UPDATING.md#unification-of-do_xcom_push-flag
-        if kwargs.get('xcom_push') is not None:
-            kwargs['do_xcom_push'] = kwargs.pop('xcom_push')
-            warnings.warn(
-                "`xcom_push` will be deprecated. Use `do_xcom_push` instead.",
-                DeprecationWarning, stacklevel=2
-            )
-        super(KubernetesPodOperator, self).__init__(*args, resources=None, **kwargs)
-        self.pod = None
-        self.do_xcom_push = do_xcom_push
-        self.image = image
-        self.namespace = namespace
-        self.cmds = cmds or []
-        self.arguments = arguments or []
-        self.labels = labels or {}
-        self.startup_timeout_seconds = startup_timeout_seconds
-        self.name = self._set_name(name)
-        self.env_vars = env_vars or {}
-        self.ports = ports or []
-        self.init_containers = init_containers or []
-        self.priority_class_name = priority_class_name
-        self.volume_mounts = volume_mounts or []
-        self.volumes = volumes or []
-        self.secrets = secrets or []
-        self.in_cluster = in_cluster
-        self.cluster_context = cluster_context
-        self.get_logs = get_logs
-        self.image_pull_policy = image_pull_policy
-        self.node_selectors = node_selectors or {}
-        self.annotations = annotations or {}
-        self.affinity = affinity or {}
-        self.resources = self._set_resources(resources)
-        self.config_file = config_file
-        self.image_pull_secrets = image_pull_secrets
-        self.service_account_name = service_account_name
-        self.is_delete_operator_pod = is_delete_operator_pod
-        self.hostnetwork = hostnetwork
-        self.tolerations = tolerations or []
-        self.configmaps = configmaps or []
-        self.security_context = security_context or {}
-        self.pod_runtime_info_envs = pod_runtime_info_envs or []
-        self.dnspolicy = dnspolicy
-        self.full_pod_spec = full_pod_spec
+    def __init__(self, *args, **kwargs):
+        warnings.warn(
+            """This class is deprecated
+            Please use
+            `airflow.providers.cncd.kubernetes.operators.kubernetes_pod.KubernetesPodOperator.`.""",
+            DeprecationWarning, stacklevel=2
+        )
+        super(KubernetesPodOperator, self).__init__(*args, **kwargs)
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
similarity index 100%
copy from airflow/contrib/operators/kubernetes_pod_operator.py
copy to airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
diff --git a/chart/charts/postgresql-6.3.12.tgz b/chart/charts/postgresql-6.3.12.tgz
new file mode 100644
index 0000000..51751d7
Binary files /dev/null and b/chart/charts/postgresql-6.3.12.tgz differ