You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by di...@apache.org on 2020/06/02 00:59:07 UTC

[airflow] 01/01: [AIRFLOW-4851] Refactor K8S codebase with k8s API models (#5481)

This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit aaaa366527bf95801acaef7f01107a3cdd5ef938
Author: davlum <da...@gmail.com>
AuthorDate: Mon Jun 1 17:57:44 2020 -0700

    [AIRFLOW-4851] Refactor K8S codebase with k8s API models (#5481)
    
    * [AIRLFOW-4851] refactor Airflow kubernetes
    
    * [AIRFLOW-4851] refactor Airflow k8s models
    
    * [AIRFLOW-4851] Fix linting and tests
    * Refactor and add some tests
    
    * [AIRLFOW-4851] Add assertions to PodOperator tests
    
    Author: davlum <da...@gmail.com>
    Date:   Wed Sep 4 17:24:31 2019 -0400
    
    (cherry picked from commit 17d4179db2d4424b2bbd4a7f9546a0ca3628cac2)
---
 .../contrib/operators/kubernetes_pod_operator.py   | 194 +++--
 airflow/executors/kubernetes_executor.py           | 102 +--
 airflow/kubernetes/k8s_model.py                    |  61 ++
 .../kubernetes_request_factory/__init__.py         |  16 -
 .../kubernetes_request_factory.py                  | 258 ------
 .../pod_request_factory.py                         | 139 ----
 airflow/kubernetes/pod.py                          | 124 +--
 airflow/kubernetes/pod_generator.py                | 470 +++++++----
 airflow/kubernetes/pod_launcher.py                 |  58 +-
 airflow/kubernetes/pod_runtime_info_env.py         |  28 +-
 airflow/kubernetes/secret.py                       |  63 +-
 airflow/kubernetes/volume.py                       |  20 +-
 airflow/kubernetes/volume_mount.py                 |  24 +-
 airflow/kubernetes/worker_configuration.py         | 516 ++++++------
 airflow/settings.py                                |   5 +-
 .../ci/in_container/kubernetes/app/deploy_app.sh   |   4 +
 .../ci/in_container/kubernetes/app/secrets.yaml    |   4 +-
 .../ci/in_container/kubernetes/app/volumes.yaml    |   4 +-
 tests/executors/test_kubernetes_executor.py        | 882 +--------------------
 .../test_kubernetes_request_factory.py             | 396 ---------
 .../test_pod_request_factory.py                    | 175 ----
 .../__init__.py                                    |   0
 tests/kubernetes/models/test_pod.py                |  76 ++
 tests/kubernetes/models/test_secret.py             | 115 +++
 tests/kubernetes/test_pod_generator.py             | 328 ++++++++
 tests/kubernetes/test_pod_launcher.py              |  18 +-
 tests/kubernetes/test_worker_configuration.py      | 606 ++++++++++++++
 .../kubernetes/test_kubernetes_pod_operator.py     | 123 ++-
 tests/test_local_settings.py                       |   4 +-
 29 files changed, 2195 insertions(+), 2618 deletions(-)

diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index d121510..bacf2ed 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -15,15 +15,13 @@
 # specific language governing permissions and limitations
 # under the License.
 """Executes task in a Kubernetes POD"""
-import re
 import warnings
 
 from airflow.exceptions import AirflowException
+from airflow.kubernetes import pod_generator, kube_client, pod_launcher
+from airflow.kubernetes.k8s_model import append_to_pod
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
-from airflow.kubernetes import kube_client, pod_generator, pod_launcher
-from airflow.kubernetes.pod import Resources
-from airflow.utils.helpers import validate_key
 from airflow.utils.state import State
 from airflow.version import version as airflow_version
 
@@ -49,13 +47,13 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
                                If more than one secret is required, provide a
                                comma separated list: secret_a,secret_b
     :type image_pull_secrets: str
-    :param ports: ports for launched pod.
-    :type ports: list[airflow.kubernetes.pod.Port]
-    :param volume_mounts: volumeMounts for launched pod.
-    :type volume_mounts: list[airflow.kubernetes.volume_mount.VolumeMount]
-    :param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes.
-    :type volumes: list[airflow.kubernetes.volume.Volume]
-    :param labels: labels to apply to the Pod.
+    :param ports: ports for launched pod
+    :type ports: list[airflow.kubernetes.models.port.Port]
+    :param volume_mounts: volumeMounts for launched pod
+    :type volume_mounts: list[airflow.kubernetes.models.volume_mount.VolumeMount]
+    :param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes
+    :type volumes: list[airflow.kubernetes.models.volume.Volume]
+    :param labels: labels to apply to the Pod
     :type labels: dict
     :param startup_timeout_seconds: timeout in seconds to startup the pod.
     :type startup_timeout_seconds: int
@@ -64,10 +62,10 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
     :type name: str
     :param env_vars: Environment variables initialized in the container. (templated)
     :type env_vars: dict
-    :param secrets: Kubernetes secrets to inject in the container.
-        They can be exposed as environment vars or files in a volume
-    :type secrets: list[airflow.kubernetes.secret.Secret]
-    :param in_cluster: run kubernetes client with in_cluster configuration.
+    :param secrets: Kubernetes secrets to inject in the container,
+        They can be exposed as environment vars or files in a volume.
+    :type secrets: list[airflow.kubernetes.models.secret.Secret]
+    :param in_cluster: run kubernetes client with in_cluster configuration
     :type in_cluster: bool
     :param cluster_context: context that points to kubernetes cluster.
         Ignored when in_cluster is True. If None, current-context is used.
@@ -105,15 +103,82 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
         want mount as env variables.
     :type configmaps: list[str]
     :param pod_runtime_info_envs: environment variables about
-                                  pod runtime information (ip, namespace, nodeName, podName).
-    :type pod_runtime_info_envs: list[PodRuntimeEnv]
-    :param security_context: security options the pod should run with (PodSecurityContext).
-    :type security_context: dict
-    :param dnspolicy: dnspolicy for the pod.
+                                  pod runtime information (ip, namespace, nodeName, podName)
+    :type pod_runtime_info_envs: list[airflow.kubernetes.models.pod_runtime_info_env.PodRuntimeInfoEnv]
+    :param dnspolicy: Specify a dnspolicy for the pod
     :type dnspolicy: str
+    :param full_pod_spec: The complete podSpec
+    :type full_pod_spec: kubernetes.client.models.V1Pod
     """
     template_fields = ('cmds', 'arguments', 'env_vars', 'config_file')
 
+    def execute(self, context):
+        try:
+            client = kube_client.get_kube_client(in_cluster=self.in_cluster,
+                                                 cluster_context=self.cluster_context,
+                                                 config_file=self.config_file)
+            # Add Airflow Version to the label
+            # And a label to identify that pod is launched by KubernetesPodOperator
+            self.labels.update(
+                {
+                    'airflow_version': airflow_version.replace('+', '-'),
+                    'kubernetes_pod_operator': 'True',
+                }
+            )
+
+            pod = pod_generator.PodGenerator(
+                image=self.image,
+                namespace=self.namespace,
+                cmds=self.cmds,
+                args=self.arguments,
+                labels=self.labels,
+                name=self.name,
+                envs=self.env_vars,
+                extract_xcom=self.do_xcom_push,
+                image_pull_policy=self.image_pull_policy,
+                node_selectors=self.node_selectors,
+                annotations=self.annotations,
+                affinity=self.affinity,
+                image_pull_secrets=self.image_pull_secrets,
+                service_account_name=self.service_account_name,
+                hostnetwork=self.hostnetwork,
+                tolerations=self.tolerations,
+                configmaps=self.configmaps,
+                security_context=self.security_context,
+                dnspolicy=self.dnspolicy,
+                resources=self.resources,
+                pod=self.full_pod_spec,
+            ).gen_pod()
+
+            pod = append_to_pod(pod, self.ports)
+            pod = append_to_pod(pod, self.pod_runtime_info_envs)
+            pod = append_to_pod(pod, self.volumes)
+            pod = append_to_pod(pod, self.volume_mounts)
+            pod = append_to_pod(pod, self.secrets)
+
+            self.pod = pod
+
+            launcher = pod_launcher.PodLauncher(kube_client=client,
+                                                extract_xcom=self.do_xcom_push)
+
+            try:
+                (final_state, result) = launcher.run_pod(
+                    pod,
+                    startup_timeout=self.startup_timeout_seconds,
+                    get_logs=self.get_logs)
+            finally:
+                if self.is_delete_operator_pod:
+                    launcher.delete_pod(pod)
+
+            if final_state != State.SUCCESS:
+                raise AirflowException(
+                    'Pod returned a failure: {state}'.format(state=final_state)
+                )
+
+            return result
+        except AirflowException as ex:
+            raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
+
     @apply_defaults
     def __init__(self,  # pylint: disable=too-many-arguments,too-many-locals
                  namespace,
@@ -136,6 +201,7 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
                  resources=None,
                  affinity=None,
                  config_file=None,
+                 do_xcom_push=False,
                  node_selectors=None,
                  image_pull_secrets=None,
                  service_account_name='default',
@@ -146,7 +212,7 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
                  security_context=None,
                  pod_runtime_info_envs=None,
                  dnspolicy=None,
-                 do_xcom_push=False,
+                 full_pod_spec=None,
                  *args,
                  **kwargs):
         # https://github.com/apache/airflow/blob/2d0eff4ee4fafcf8c7978ac287a8fb968e56605f/UPDATING.md#unification-of-do_xcom_push-flag
@@ -157,6 +223,7 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
                 DeprecationWarning, stacklevel=2
             )
         super(KubernetesPodOperator, self).__init__(*args, resources=None, **kwargs)
+        self.pod = None
         self.do_xcom_push = do_xcom_push
         self.image = image
         self.namespace = namespace
@@ -164,7 +231,7 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
         self.arguments = arguments or []
         self.labels = labels or {}
         self.startup_timeout_seconds = startup_timeout_seconds
-        self.name = self._set_name(name)
+        self.name = name
         self.env_vars = env_vars or {}
         self.ports = ports or []
         self.volume_mounts = volume_mounts or []
@@ -177,7 +244,7 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
         self.node_selectors = node_selectors or {}
         self.annotations = annotations or {}
         self.affinity = affinity or {}
-        self.resources = self._set_resources(resources)
+        self.resources = resources
         self.config_file = config_file
         self.image_pull_secrets = image_pull_secrets
         self.service_account_name = service_account_name
@@ -188,83 +255,4 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
         self.security_context = security_context or {}
         self.pod_runtime_info_envs = pod_runtime_info_envs or []
         self.dnspolicy = dnspolicy
-
-    def execute(self, context):
-        try:
-            if self.in_cluster is not None:
-                client = kube_client.get_kube_client(in_cluster=self.in_cluster,
-                                                     cluster_context=self.cluster_context,
-                                                     config_file=self.config_file)
-            else:
-                client = kube_client.get_kube_client(cluster_context=self.cluster_context,
-                                                     config_file=self.config_file)
-
-            # Add Airflow Version to the label
-            # And a label to identify that pod is launched by KubernetesPodOperator
-            self.labels.update(
-                {
-                    'airflow_version': airflow_version.replace('+', '-'),
-                    'kubernetes_pod_operator': 'True',
-                }
-            )
-
-            gen = pod_generator.PodGenerator()
-
-            for port in self.ports:
-                gen.add_port(port)
-            for mount in self.volume_mounts:
-                gen.add_mount(mount)
-            for volume in self.volumes:
-                gen.add_volume(volume)
-
-            pod = gen.make_pod(
-                namespace=self.namespace,
-                image=self.image,
-                pod_id=self.name,
-                cmds=self.cmds,
-                arguments=self.arguments,
-                labels=self.labels,
-            )
-
-            pod.service_account_name = self.service_account_name
-            pod.secrets = self.secrets
-            pod.envs = self.env_vars
-            pod.image_pull_policy = self.image_pull_policy
-            pod.image_pull_secrets = self.image_pull_secrets
-            pod.annotations = self.annotations
-            pod.resources = self.resources
-            pod.affinity = self.affinity
-            pod.node_selectors = self.node_selectors
-            pod.hostnetwork = self.hostnetwork
-            pod.tolerations = self.tolerations
-            pod.configmaps = self.configmaps
-            pod.security_context = self.security_context
-            pod.pod_runtime_info_envs = self.pod_runtime_info_envs
-            pod.dnspolicy = self.dnspolicy
-
-            launcher = pod_launcher.PodLauncher(kube_client=client,
-                                                extract_xcom=self.do_xcom_push)
-            try:
-                (final_state, result) = launcher.run_pod(
-                    pod,
-                    startup_timeout=self.startup_timeout_seconds,
-                    get_logs=self.get_logs)
-            finally:
-                if self.is_delete_operator_pod:
-                    launcher.delete_pod(pod)
-
-            if final_state != State.SUCCESS:
-                raise AirflowException(
-                    'Pod returned a failure: {state}'.format(state=final_state)
-                )
-            if self.do_xcom_push:
-                return result
-        except AirflowException as ex:
-            raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
-
-    def _set_resources(self, resources):
-        return Resources(**resources) if resources else Resources()
-
-    def _set_name(self, name):
-        validate_key(name, max_length=63)
-        return re.sub(r'[^a-z0-9.-]+', '-', name.lower())
+        self.full_pod_spec = full_pod_spec
diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index 32f4fd5..6944274 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -36,8 +36,8 @@ from airflow.configuration import conf
 from airflow.kubernetes.pod_launcher import PodLauncher
 from airflow.kubernetes.kube_client import get_kube_client
 from airflow.kubernetes.worker_configuration import WorkerConfiguration
+from airflow.kubernetes.pod_generator import PodGenerator
 from airflow.executors.base_executor import BaseExecutor
-from airflow.executors import Executors
 from airflow.models import KubeResourceVersion, KubeWorkerIdentifier, TaskInstance
 from airflow.utils.state import State
 from airflow.utils.db import provide_session, create_session
@@ -49,87 +49,6 @@ MAX_POD_ID_LEN = 253
 MAX_LABEL_LEN = 63
 
 
-class KubernetesExecutorConfig:
-    def __init__(self, image=None, image_pull_policy=None, request_memory=None,
-                 request_cpu=None, limit_memory=None, limit_cpu=None, limit_gpu=None,
-                 gcp_service_account_key=None, node_selectors=None, affinity=None,
-                 annotations=None, volumes=None, volume_mounts=None, tolerations=None, labels=None):
-        self.image = image
-        self.image_pull_policy = image_pull_policy
-        self.request_memory = request_memory
-        self.request_cpu = request_cpu
-        self.limit_memory = limit_memory
-        self.limit_cpu = limit_cpu
-        self.limit_gpu = limit_gpu
-        self.gcp_service_account_key = gcp_service_account_key
-        self.node_selectors = node_selectors
-        self.affinity = affinity
-        self.annotations = annotations
-        self.volumes = volumes
-        self.volume_mounts = volume_mounts
-        self.tolerations = tolerations
-        self.labels = labels or {}
-
-    def __repr__(self):
-        return "{}(image={}, image_pull_policy={}, request_memory={}, request_cpu={}, " \
-               "limit_memory={}, limit_cpu={}, limit_gpu={}, gcp_service_account_key={}, " \
-               "node_selectors={}, affinity={}, annotations={}, volumes={}, " \
-               "volume_mounts={}, tolerations={}, labels={})" \
-            .format(KubernetesExecutorConfig.__name__, self.image, self.image_pull_policy,
-                    self.request_memory, self.request_cpu, self.limit_memory,
-                    self.limit_cpu, self.limit_gpu, self.gcp_service_account_key, self.node_selectors,
-                    self.affinity, self.annotations, self.volumes, self.volume_mounts,
-                    self.tolerations, self.labels)
-
-    @staticmethod
-    def from_dict(obj):
-        if obj is None:
-            return KubernetesExecutorConfig()
-
-        if not isinstance(obj, dict):
-            raise TypeError(
-                'Cannot convert a non-dictionary object into a KubernetesExecutorConfig')
-
-        namespaced = obj.get(Executors.KubernetesExecutor, {})
-
-        return KubernetesExecutorConfig(
-            image=namespaced.get('image', None),
-            image_pull_policy=namespaced.get('image_pull_policy', None),
-            request_memory=namespaced.get('request_memory', None),
-            request_cpu=namespaced.get('request_cpu', None),
-            limit_memory=namespaced.get('limit_memory', None),
-            limit_cpu=namespaced.get('limit_cpu', None),
-            limit_gpu=namespaced.get('limit_gpu', None),
-            gcp_service_account_key=namespaced.get('gcp_service_account_key', None),
-            node_selectors=namespaced.get('node_selectors', None),
-            affinity=namespaced.get('affinity', None),
-            annotations=namespaced.get('annotations', {}),
-            volumes=namespaced.get('volumes', []),
-            volume_mounts=namespaced.get('volume_mounts', []),
-            tolerations=namespaced.get('tolerations', None),
-            labels=namespaced.get('labels', {}),
-        )
-
-    def as_dict(self):
-        return {
-            'image': self.image,
-            'image_pull_policy': self.image_pull_policy,
-            'request_memory': self.request_memory,
-            'request_cpu': self.request_cpu,
-            'limit_memory': self.limit_memory,
-            'limit_cpu': self.limit_cpu,
-            'limit_gpu': self.limit_gpu,
-            'gcp_service_account_key': self.gcp_service_account_key,
-            'node_selectors': self.node_selectors,
-            'affinity': self.affinity,
-            'annotations': self.annotations,
-            'volumes': self.volumes,
-            'volume_mounts': self.volume_mounts,
-            'tolerations': self.tolerations,
-            'labels': self.labels,
-        }
-
-
 class KubeConfig:
     """Configuration for Kubernetes"""
     core_section = 'core'
@@ -473,17 +392,23 @@ class AirflowKubernetesScheduler(LoggingMixin):
         self.log.info('Kubernetes job is %s', str(next_job))
         key, command, kube_executor_config = next_job
         dag_id, task_id, execution_date, try_number = key
-        self.log.debug("Kubernetes running for command %s", command)
-        self.log.debug("Kubernetes launching image %s", self.kube_config.kube_image)
-        pod = self.worker_configuration.make_pod(
-            namespace=self.namespace, worker_uuid=self.worker_uuid,
+
+        config_pod = self.worker_configuration.make_pod(
+            namespace=self.namespace,
+            worker_uuid=self.worker_uuid,
             pod_id=self._create_pod_id(dag_id, task_id),
             dag_id=self._make_safe_label_value(dag_id),
             task_id=self._make_safe_label_value(task_id),
             try_number=try_number,
             execution_date=self._datetime_to_label_safe_datestring(execution_date),
-            airflow_command=command, kube_executor_config=kube_executor_config
+            airflow_command=command
         )
+        # Reconcile the pod generated by the Operator and the Pod
+        # generated by the .cfg file
+        pod = PodGenerator.reconcile_pods(config_pod, kube_executor_config)
+        self.log.debug("Kubernetes running for command %s", command)
+        self.log.debug("Kubernetes launching image %s", pod.spec.containers[0].image)
+
         # the watcher will monitor pods, so we do not block.
         self.launcher.run_pod_async(pod, **self.kube_config.kube_client_request_args)
         self.log.debug("Kubernetes Job created!")
@@ -841,7 +766,8 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
             'Add task %s with command %s with executor_config %s',
             key, command, executor_config
         )
-        kube_executor_config = KubernetesExecutorConfig.from_dict(executor_config)
+
+        kube_executor_config = PodGenerator.from_obj(executor_config)
         self.task_queue.put((key, command, kube_executor_config))
 
     def sync(self):
diff --git a/airflow/kubernetes/k8s_model.py b/airflow/kubernetes/k8s_model.py
new file mode 100644
index 0000000..7049b1d
--- /dev/null
+++ b/airflow/kubernetes/k8s_model.py
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Classes for interacting with Kubernetes API
+"""
+
+import abc
+import sys
+from functools import reduce
+
+if sys.version_info >= (3, 4):
+    ABC = abc.ABC
+else:
+    ABC = abc.ABCMeta('ABC', (), {})
+
+
+class K8SModel(ABC):
+    """
+    These Airflow Kubernetes models are here for backwards compatibility
+    reasons only. Ideally clients should use the kubernetes api
+    and the process of
+
+        client input -> Airflow k8s models -> k8s models
+
+    can be avoided. All of these models implement the
+    `attach_to_pod` method so that they integrate with the kubernetes client.
+    """
+    @abc.abstractmethod
+    def attach_to_pod(self, pod):
+        """
+        :param pod: A pod to attach this Kubernetes object to
+        :type pod: kubernetes.client.models.V1Pod
+        :return: The pod with the object attached
+        """
+
+
+def append_to_pod(pod, k8s_objects):
+    """
+    :param pod: A pod to attach a list of Kubernetes objects to
+    :type pod: kubernetes.client.models.V1Pod
+    :param k8s_objects: a potential None list of K8SModels
+    :type k8s_objects: Optional[List[K8SModel]]
+    :return: pod with the objects attached if they exist
+    """
+    if not k8s_objects:
+        return pod
+    return reduce(lambda p, o: o.attach_to_pod(p), k8s_objects, pod)
diff --git a/airflow/kubernetes/kubernetes_request_factory/__init__.py b/airflow/kubernetes/kubernetes_request_factory/__init__.py
deleted file mode 100644
index 13a8339..0000000
--- a/airflow/kubernetes/kubernetes_request_factory/__init__.py
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
diff --git a/airflow/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
deleted file mode 100644
index ad58510..0000000
--- a/airflow/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++ /dev/null
@@ -1,258 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from abc import ABCMeta, abstractmethod
-import six
-
-
-class KubernetesRequestFactory:
-    """
-    Create requests to be sent to kube API.
-    Extend this class to talk to kubernetes and generate your specific resources.
-    This is equivalent of generating yaml files that can be used by `kubectl`
-    """
-    __metaclass__ = ABCMeta
-
-    @abstractmethod
-    def create(self, pod):
-        """
-        Creates the request for kubernetes API.
-
-        :param pod: The pod object
-        """
-
-    @staticmethod
-    def extract_image(pod, req):
-        req['spec']['containers'][0]['image'] = pod.image
-
-    @staticmethod
-    def extract_image_pull_policy(pod, req):
-        if pod.image_pull_policy:
-            req['spec']['containers'][0]['imagePullPolicy'] = pod.image_pull_policy
-
-    @staticmethod
-    def add_secret_to_env(env, secret):
-        env.append({
-            'name': secret.deploy_target,
-            'valueFrom': {
-                'secretKeyRef': {
-                    'name': secret.secret,
-                    'key': secret.key
-                }
-            }
-        })
-
-    @staticmethod
-    def add_runtime_info_env(env, runtime_info):
-        env.append({
-            'name': runtime_info.name,
-            'valueFrom': {
-                'fieldRef': {
-                    'fieldPath': runtime_info.field_path
-                }
-            }
-        })
-
-    @staticmethod
-    def extract_labels(pod, req):
-        req['metadata']['labels'] = req['metadata'].get('labels', {})
-        for k, v in six.iteritems(pod.labels):
-            req['metadata']['labels'][k] = v
-
-    @staticmethod
-    def extract_annotations(pod, req):
-        req['metadata']['annotations'] = req['metadata'].get('annotations', {})
-        for k, v in six.iteritems(pod.annotations):
-            req['metadata']['annotations'][k] = v
-
-    @staticmethod
-    def extract_affinity(pod, req):
-        req['spec']['affinity'] = req['spec'].get('affinity', {})
-        for k, v in six.iteritems(pod.affinity):
-            req['spec']['affinity'][k] = v
-
-    @staticmethod
-    def extract_node_selector(pod, req):
-        req['spec']['nodeSelector'] = req['spec'].get('nodeSelector', {})
-        for k, v in six.iteritems(pod.node_selectors):
-            req['spec']['nodeSelector'][k] = v
-
-    @staticmethod
-    def extract_cmds(pod, req):
-        req['spec']['containers'][0]['command'] = pod.cmds
-
-    @staticmethod
-    def extract_args(pod, req):
-        req['spec']['containers'][0]['args'] = pod.args
-
-    @staticmethod
-    def attach_ports(pod, req):
-        req['spec']['containers'][0]['ports'] = (
-            req['spec']['containers'][0].get('ports', []))
-        if len(pod.ports) > 0:
-            req['spec']['containers'][0]['ports'].extend(pod.ports)
-
-    @staticmethod
-    def attach_volumes(pod, req):
-        req['spec']['volumes'] = (
-            req['spec'].get('volumes', []))
-        if len(pod.volumes) > 0:
-            req['spec']['volumes'].extend(pod.volumes)
-
-    @staticmethod
-    def attach_volume_mounts(pod, req):
-        if len(pod.volume_mounts) > 0:
-            req['spec']['containers'][0]['volumeMounts'] = (
-                req['spec']['containers'][0].get('volumeMounts', []))
-            req['spec']['containers'][0]['volumeMounts'].extend(pod.volume_mounts)
-
-    @staticmethod
-    def extract_name(pod, req):
-        req['metadata']['name'] = pod.name
-
-    @staticmethod
-    def extract_volume_secrets(pod, req):
-        vol_secrets = [s for s in pod.secrets if s.deploy_type == 'volume']
-        if any(vol_secrets):
-            req['spec']['containers'][0]['volumeMounts'] = (
-                req['spec']['containers'][0].get('volumeMounts', []))
-            req['spec']['volumes'] = (
-                req['spec'].get('volumes', []))
-        for idx, vol in enumerate(vol_secrets):
-            vol_id = 'secretvol' + str(idx)
-            req['spec']['containers'][0]['volumeMounts'].append({
-                'mountPath': vol.deploy_target,
-                'name': vol_id,
-                'readOnly': True
-            })
-            req['spec']['volumes'].append({
-                'name': vol_id,
-                'secret': {
-                    'secretName': vol.secret
-                }
-            })
-
-    @staticmethod
-    def extract_env_and_secrets(pod, req):
-        envs_from_key_secrets = [
-            env for env in pod.secrets if env.deploy_type == 'env' and env.key is not None
-        ]
-
-        if len(pod.envs) > 0 or len(envs_from_key_secrets) > 0 or len(pod.pod_runtime_info_envs) > 0:
-            env = []
-            for runtime_info in pod.pod_runtime_info_envs:
-                KubernetesRequestFactory.add_runtime_info_env(env, runtime_info)
-            for k in pod.envs.keys():
-                env.append({'name': k, 'value': pod.envs[k]})
-            for secret in envs_from_key_secrets:
-                KubernetesRequestFactory.add_secret_to_env(env, secret)
-
-            req['spec']['containers'][0]['env'] = env
-
-        KubernetesRequestFactory._apply_env_from(pod, req)
-
-    @staticmethod
-    def extract_resources(pod, req):
-        if not pod.resources or pod.resources.is_empty_resource_request():
-            return
-
-        req['spec']['containers'][0]['resources'] = {}
-
-        if pod.resources.has_requests():
-            req['spec']['containers'][0]['resources']['requests'] = {}
-            if pod.resources.request_memory:
-                req['spec']['containers'][0]['resources']['requests'][
-                    'memory'] = pod.resources.request_memory
-            if pod.resources.request_cpu:
-                req['spec']['containers'][0]['resources']['requests'][
-                    'cpu'] = pod.resources.request_cpu
-
-        if pod.resources.has_limits():
-            req['spec']['containers'][0]['resources']['limits'] = {}
-            if pod.resources.limit_memory:
-                req['spec']['containers'][0]['resources']['limits'][
-                    'memory'] = pod.resources.limit_memory
-            if pod.resources.limit_cpu:
-                req['spec']['containers'][0]['resources']['limits'][
-                    'cpu'] = pod.resources.limit_cpu
-            if pod.resources.limit_gpu:
-                req['spec']['containers'][0]['resources']['limits'][
-                    'nvidia.com/gpu'] = pod.resources.limit_gpu
-
-    @staticmethod
-    def extract_init_containers(pod, req):
-        if pod.init_containers:
-            req['spec']['initContainers'] = pod.init_containers
-
-    @staticmethod
-    def extract_service_account_name(pod, req):
-        if pod.service_account_name:
-            req['spec']['serviceAccountName'] = pod.service_account_name
-
-    @staticmethod
-    def extract_hostnetwork(pod, req):
-        if pod.hostnetwork:
-            req['spec']['hostNetwork'] = pod.hostnetwork
-
-    @staticmethod
-    def extract_dnspolicy(pod, req):
-        if pod.dnspolicy:
-            req['spec']['dnsPolicy'] = pod.dnspolicy
-
-    @staticmethod
-    def extract_image_pull_secrets(pod, req):
-        if pod.image_pull_secrets:
-            req['spec']['imagePullSecrets'] = [{
-                'name': pull_secret
-            } for pull_secret in pod.image_pull_secrets.split(',')]
-
-    @staticmethod
-    def extract_tolerations(pod, req):
-        if pod.tolerations:
-            req['spec']['tolerations'] = pod.tolerations
-
-    @staticmethod
-    def extract_security_context(pod, req):
-        if pod.security_context:
-            req['spec']['securityContext'] = pod.security_context
-
-    @staticmethod
-    def _apply_env_from(pod, req):
-        envs_from_secrets = [
-            env for env in pod.secrets if env.deploy_type == 'env' and env.key is None
-        ]
-
-        if pod.configmaps or envs_from_secrets:
-            req['spec']['containers'][0]['envFrom'] = []
-
-        for secret in envs_from_secrets:
-            req['spec']['containers'][0]['envFrom'].append(
-                {
-                    'secretRef': {
-                        'name': secret.secret
-                    }
-                }
-            )
-
-        for configmap in pod.configmaps:
-            req['spec']['containers'][0]['envFrom'].append(
-                {
-                    'configMapRef': {
-                        'name': configmap
-                    }
-                }
-            )
diff --git a/airflow/kubernetes/kubernetes_request_factory/pod_request_factory.py b/airflow/kubernetes/kubernetes_request_factory/pod_request_factory.py
deleted file mode 100644
index 3790948..0000000
--- a/airflow/kubernetes/kubernetes_request_factory/pod_request_factory.py
+++ /dev/null
@@ -1,139 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import yaml
-from airflow.kubernetes.pod import Pod
-from airflow.kubernetes.kubernetes_request_factory.kubernetes_request_factory \
-    import KubernetesRequestFactory
-
-
-class SimplePodRequestFactory(KubernetesRequestFactory):
-    """
-    Request generator for a pod.
-
-    """
-    _yaml = """apiVersion: v1
-kind: Pod
-metadata:
-  name: name
-spec:
-  containers:
-    - name: base
-      image: airflow-worker:latest
-      command: ["/usr/local/airflow/entrypoint.sh", "/bin/bash sleep 25"]
-  restartPolicy: Never
-    """
-
-    def __init__(self):
-
-        pass
-
-    def create(self, pod):
-        # type: (Pod) -> dict
-        req = yaml.safe_load(self._yaml)
-        self.extract_name(pod, req)
-        self.extract_labels(pod, req)
-        self.extract_image(pod, req)
-        self.extract_image_pull_policy(pod, req)
-        self.extract_cmds(pod, req)
-        self.extract_args(pod, req)
-        self.extract_node_selector(pod, req)
-        self.extract_env_and_secrets(pod, req)
-        self.extract_volume_secrets(pod, req)
-        self.attach_ports(pod, req)
-        self.attach_volumes(pod, req)
-        self.attach_volume_mounts(pod, req)
-        self.extract_resources(pod, req)
-        self.extract_service_account_name(pod, req)
-        self.extract_init_containers(pod, req)
-        self.extract_image_pull_secrets(pod, req)
-        self.extract_annotations(pod, req)
-        self.extract_affinity(pod, req)
-        self.extract_hostnetwork(pod, req)
-        self.extract_tolerations(pod, req)
-        self.extract_security_context(pod, req)
-        self.extract_dnspolicy(pod, req)
-        return req
-
-
-class ExtractXcomPodRequestFactory(KubernetesRequestFactory):
-    """
-    Request generator for a pod with sidecar container.
-
-    """
-
-    XCOM_MOUNT_PATH = '/airflow/xcom'
-    SIDECAR_CONTAINER_NAME = 'airflow-xcom-sidecar'
-    _yaml = """apiVersion: v1
-kind: Pod
-metadata:
-  name: name
-spec:
-  volumes:
-    - name: xcom
-      emptyDir: {{}}
-  containers:
-    - name: base
-      image: airflow-worker:latest
-      command: ["/usr/local/airflow/entrypoint.sh", "/bin/bash sleep 25"]
-      volumeMounts:
-        - name: xcom
-          mountPath: {xcomMountPath}
-    - name: {sidecarContainerName}
-      image: alpine
-      command:
-        - sh
-        - -c
-        - 'trap "exit 0" INT; while true; do sleep 30; done;'
-      volumeMounts:
-        - name: xcom
-          mountPath: {xcomMountPath}
-      resources:
-        requests:
-          cpu: 1m
-  restartPolicy: Never
-    """.format(xcomMountPath=XCOM_MOUNT_PATH, sidecarContainerName=SIDECAR_CONTAINER_NAME)
-
-    def __init__(self):
-        pass
-
-    def create(self, pod):
-        # type: (Pod) -> dict
-        req = yaml.safe_load(self._yaml)
-        self.extract_name(pod, req)
-        self.extract_labels(pod, req)
-        self.extract_image(pod, req)
-        self.extract_image_pull_policy(pod, req)
-        self.extract_cmds(pod, req)
-        self.extract_args(pod, req)
-        self.extract_node_selector(pod, req)
-        self.extract_env_and_secrets(pod, req)
-        self.extract_volume_secrets(pod, req)
-        self.attach_ports(pod, req)
-        self.attach_volumes(pod, req)
-        self.attach_volume_mounts(pod, req)
-        self.extract_resources(pod, req)
-        self.extract_service_account_name(pod, req)
-        self.extract_init_containers(pod, req)
-        self.extract_image_pull_secrets(pod, req)
-        self.extract_annotations(pod, req)
-        self.extract_affinity(pod, req)
-        self.extract_hostnetwork(pod, req)
-        self.extract_tolerations(pod, req)
-        self.extract_security_context(pod, req)
-        self.extract_dnspolicy(pod, req)
-        return req
diff --git a/airflow/kubernetes/pod.py b/airflow/kubernetes/pod.py
index a9f3dfe..53c9172 100644
--- a/airflow/kubernetes/pod.py
+++ b/airflow/kubernetes/pod.py
@@ -14,9 +14,16 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+"""
+Classes for interacting with Kubernetes API
+"""
 
+import copy
+import kubernetes.client.models as k8s
+from airflow.kubernetes.k8s_model import K8SModel
 
-class Resources:
+
+class Resources(K8SModel):
     __slots__ = ('request_memory', 'request_cpu', 'limit_memory', 'limit_cpu', 'limit_gpu')
 
     def __init__(
@@ -41,13 +48,20 @@ class Resources:
     def has_requests(self):
         return self.request_cpu is not None or self.request_memory is not None
 
-    def __str__(self):
-        return "Request: [cpu: {}, memory: {}], Limit: [cpu: {}, memory: {}, gpu: {}]".format(
-            self.request_cpu, self.request_memory, self.limit_cpu, self.limit_memory, self.limit_gpu
+    def to_k8s_client_obj(self):
+        return k8s.V1ResourceRequirements(
+            limits={'cpu': self.limit_cpu, 'memory': self.limit_memory, 'nvidia.com/gpu': self.limit_gpu},
+            requests={'cpu': self.request_cpu, 'memory': self.request_memory}
         )
 
+    def attach_to_pod(self, pod):
+        cp_pod = copy.deepcopy(pod)
+        resources = self.to_k8s_client_obj()
+        cp_pod.spec.containers[0].resources = resources
+        return cp_pod
+
 
-class Port:
+class Port(K8SModel):
     def __init__(
             self,
             name=None,
@@ -55,96 +69,12 @@ class Port:
         self.name = name
         self.container_port = container_port
 
+    def to_k8s_client_obj(self):
+        return k8s.V1ContainerPort(name=self.name, container_port=self.container_port)
 
-class Pod:
-    """
-    Represents a kubernetes pod and manages execution of a single pod.
-    :param image: The docker image
-    :type image: str
-    :param envs: A dict containing the environment variables
-    :type envs: dict
-    :param cmds: The command to be run on the pod
-    :type cmds: list[str]
-    :param secrets: Secrets to be launched to the pod
-    :type secrets: list[airflow.kubernetes.secret.Secret]
-    :param result: The result that will be returned to the operator after
-        successful execution of the pod
-    :type result: any
-    :param image_pull_policy: Specify a policy to cache or always pull an image
-    :type image_pull_policy: str
-    :param image_pull_secrets: Any image pull secrets to be given to the pod.
-        If more than one secret is required, provide a comma separated list:
-        secret_a,secret_b
-    :type image_pull_secrets: str
-    :param affinity: A dict containing a group of affinity scheduling rules
-    :type affinity: dict
-    :param hostnetwork: If True enable host networking on the pod
-    :type hostnetwork: bool
-    :param tolerations: A list of kubernetes tolerations
-    :type tolerations: list
-    :param security_context: A dict containing the security context for the pod
-    :type security_context: dict
-    :param configmaps: A list containing names of configmaps object
-        mounting env variables to the pod
-    :type configmaps: list[str]
-    :param pod_runtime_info_envs: environment variables about
-                                  pod runtime information (ip, namespace, nodeName, podName)
-    :type pod_runtime_info_envs: list[PodRuntimeEnv]
-    :param dnspolicy: Specify a dnspolicy for the pod
-    :type dnspolicy: str
-    """
-    def __init__(
-            self,
-            image,
-            envs,
-            cmds,
-            args=None,
-            secrets=None,
-            labels=None,
-            node_selectors=None,
-            name=None,
-            ports=None,
-            volumes=None,
-            volume_mounts=None,
-            namespace='default',
-            result=None,
-            image_pull_policy='IfNotPresent',
-            image_pull_secrets=None,
-            init_containers=None,
-            service_account_name=None,
-            resources=None,
-            annotations=None,
-            affinity=None,
-            hostnetwork=False,
-            tolerations=None,
-            security_context=None,
-            configmaps=None,
-            pod_runtime_info_envs=None,
-            dnspolicy=None
-    ):
-        self.image = image
-        self.envs = envs or {}
-        self.cmds = cmds
-        self.args = args or []
-        self.secrets = secrets or []
-        self.result = result
-        self.labels = labels or {}
-        self.name = name
-        self.ports = ports or []
-        self.volumes = volumes or []
-        self.volume_mounts = volume_mounts or []
-        self.node_selectors = node_selectors or {}
-        self.namespace = namespace
-        self.image_pull_policy = image_pull_policy
-        self.image_pull_secrets = image_pull_secrets
-        self.init_containers = init_containers
-        self.service_account_name = service_account_name
-        self.resources = resources or Resources()
-        self.annotations = annotations or {}
-        self.affinity = affinity or {}
-        self.hostnetwork = hostnetwork or False
-        self.tolerations = tolerations or []
-        self.security_context = security_context
-        self.configmaps = configmaps or []
-        self.pod_runtime_info_envs = pod_runtime_info_envs or []
-        self.dnspolicy = dnspolicy
+    def attach_to_pod(self, pod):
+        cp_pod = copy.deepcopy(pod)
+        port = self.to_k8s_client_obj()
+        cp_pod.spec.containers[0].ports = cp_pod.spec.containers[0].ports or []
+        cp_pod.spec.containers[0].ports.append(port)
+        return cp_pod
diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index 56b41d7..249a5ad 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -14,169 +14,335 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
-from airflow.kubernetes.pod import Pod, Port
-from airflow.kubernetes.volume import Volume
-from airflow.kubernetes.volume_mount import VolumeMount
-
+"""
+This module provides an interface between the previous Pod
+API and outputs a kubernetes.client.models.V1Pod.
+The advantage being that the full Kubernetes API
+is supported and no serialization need be written.
+"""
+
+import copy
+import kubernetes.client.models as k8s
+from airflow.executors import Executors
 import uuid
 
 
-class PodGenerator:
-    """Contains Kubernetes Airflow Worker configuration logic"""
-
-    def __init__(self, kube_config=None):
-        self.kube_config = kube_config
-        self.ports = []
-        self.volumes = []
-        self.volume_mounts = []
-        self.init_containers = []
-
-    def add_init_container(self,
-                           name,
-                           image,
-                           security_context,
-                           init_environment,
-                           volume_mounts
-                           ):
-        """
-
-        Adds an init container to the launched pod. useful for pre-
+class PodDefaults:
+    """
+    Static defaults for the PodGenerator
+    """
+    XCOM_MOUNT_PATH = '/airflow/xcom'
+    SIDECAR_CONTAINER_NAME = 'airflow-xcom-sidecar'
+    XCOM_CMD = """import time
+while True:
+    try:
+        time.sleep(3600)
+    except KeyboardInterrupt:
+        exit(0)
+    """
+    VOLUME_MOUNT = k8s.V1VolumeMount(
+        name='xcom',
+        mount_path=XCOM_MOUNT_PATH
+    )
+    VOLUME = k8s.V1Volume(
+        name='xcom',
+        empty_dir=k8s.V1EmptyDirVolumeSource()
+    )
+    SIDECAR_CONTAINER = k8s.V1Container(
+        name=SIDECAR_CONTAINER_NAME,
+        command=['python', '-c', XCOM_CMD],
+        image='python:3.5-alpine',
+        volume_mounts=[VOLUME_MOUNT]
+    )
 
-        Args:
-            name (str):
-            image (str):
-            security_context (dict):
-            init_environment (dict):
-            volume_mounts (dict):
 
-        Returns:
+class PodGenerator:
+    """
+    Contains Kubernetes Airflow Worker configuration logic
+
+    Represents a kubernetes pod and manages execution of a single pod.
+    :param image: The docker image
+    :type image: str
+    :param envs: A dict containing the environment variables
+    :type envs: Dict[str, str]
+    :param cmds: The command to be run on the pod
+    :type cmds: List[str]
+    :param secrets: Secrets to be launched to the pod
+    :type secrets: List[airflow.kubernetes.models.secret.Secret]
+    :param image_pull_policy: Specify a policy to cache or always pull an image
+    :type image_pull_policy: str
+    :param image_pull_secrets: Any image pull secrets to be given to the pod.
+        If more than one secret is required, provide a comma separated list:
+        secret_a,secret_b
+    :type image_pull_secrets: str
+    :param affinity: A dict containing a group of affinity scheduling rules
+    :type affinity: dict
+    :param hostnetwork: If True enable host networking on the pod
+    :type hostnetwork: bool
+    :param tolerations: A list of kubernetes tolerations
+    :type tolerations: list
+    :param security_context: A dict containing the security context for the pod
+    :type security_context: dict
+    :param configmaps: Any configmap refs to envfrom.
+        If more than one configmap is required, provide a comma separated list
+        configmap_a,configmap_b
+    :type configmaps: str
+    :param dnspolicy: Specify a dnspolicy for the pod
+    :type dnspolicy: str
+    :param pod: The fully specified pod.
+    :type pod: kubernetes.client.models.V1Pod
+    """
+
+    def __init__(
+        self,
+        image,
+        name=None,
+        namespace=None,
+        volume_mounts=None,
+        envs=None,
+        cmds=None,
+        args=None,
+        labels=None,
+        node_selectors=None,
+        ports=None,
+        volumes=None,
+        image_pull_policy='IfNotPresent',
+        restart_policy='Never',
+        image_pull_secrets=None,
+        init_containers=None,
+        service_account_name=None,
+        resources=None,
+        annotations=None,
+        affinity=None,
+        hostnetwork=False,
+        tolerations=None,
+        security_context=None,
+        configmaps=None,
+        dnspolicy=None,
+        pod=None,
+        extract_xcom=False,
+    ):
+        self.ud_pod = pod
+        self.pod = k8s.V1Pod()
+        self.pod.api_version = 'v1'
+        self.pod.kind = 'Pod'
+
+        # Pod Metadata
+        self.metadata = k8s.V1ObjectMeta()
+        self.metadata.labels = labels
+        self.metadata.name = name + "-" + str(uuid.uuid4())[:8] if name else None
+        self.metadata.namespace = namespace
+        self.metadata.annotations = annotations
+
+        # Pod Container
+        self.container = k8s.V1Container(name='base')
+        self.container.image = image
+        self.container.env = []
+
+        if envs:
+            if isinstance(envs, dict):
+                for key, val in envs.items():
+                    self.container.env.append(k8s.V1EnvVar(
+                        name=key,
+                        value=val
+                    ))
+            elif isinstance(envs, list):
+                self.container.env.extend(envs)
+
+        configmaps = configmaps or []
+        self.container.env_from = []
+        for configmap in configmaps:
+            self.container.env_from.append(k8s.V1EnvFromSource(
+                config_map_ref=k8s.V1ConfigMapEnvSource(
+                    name=configmap
+                )
+            ))
+
+        self.container.command = cmds or []
+        self.container.args = args or []
+        self.container.image_pull_policy = image_pull_policy
+        self.container.ports = ports or []
+        self.container.resources = resources
+        self.container.volume_mounts = volume_mounts or []
+
+        # Pod Spec
+        self.spec = k8s.V1PodSpec(containers=[])
+        self.spec.security_context = security_context
+        self.spec.tolerations = tolerations
+        self.spec.dns_policy = dnspolicy
+        self.spec.host_network = hostnetwork
+        self.spec.affinity = affinity
+        self.spec.service_account_name = service_account_name
+        self.spec.init_containers = init_containers
+        self.spec.volumes = volumes or []
+        self.spec.node_selector = node_selectors
+        self.spec.restart_policy = restart_policy
+
+        self.spec.image_pull_secrets = []
+
+        if image_pull_secrets:
+            for image_pull_secret in image_pull_secrets.split(','):
+                self.spec.image_pull_secrets.append(k8s.V1LocalObjectReference(
+                    name=image_pull_secret
+                ))
+
+        # Attach sidecar
+        self.extract_xcom = extract_xcom
+
+    def gen_pod(self):
+        result = self.ud_pod
+
+        if result is None:
+            result = self.pod
+            result.spec = self.spec
+            result.metadata = self.metadata
+            result.spec.containers = [self.container]
+
+        if self.extract_xcom:
+            result = self.add_sidecar(result)
+
+        return result
+
+    @staticmethod
+    def add_sidecar(pod):
+        pod_cp = copy.deepcopy(pod)
+
+        pod_cp.spec.volumes.insert(0, PodDefaults.VOLUME)
+        pod_cp.spec.containers[0].volume_mounts.insert(0, PodDefaults.VOLUME_MOUNT)
+        pod_cp.spec.containers.append(PodDefaults.SIDECAR_CONTAINER)
+
+        return pod_cp
+
+    @staticmethod
+    def from_obj(obj):
+        if obj is None:
+            return k8s.V1Pod()
+
+        if isinstance(obj, PodGenerator):
+            return obj.gen_pod()
+
+        if not isinstance(obj, dict):
+            raise TypeError(
+                'Cannot convert a non-dictionary or non-PodGenerator '
+                'object into a KubernetesExecutorConfig')
+
+        namespaced = obj.get(Executors.KubernetesExecutor, {})
+
+        resources = namespaced.get('resources')
+
+        if resources is None:
+            requests = {
+                'cpu': namespaced.get('request_cpu'),
+                'memory': namespaced.get('request_memory')
 
-        """
-        self.init_containers.append(
-            {
-                'name': name,
-                'image': image,
-                'securityContext': security_context,
-                'env': init_environment,
-                'volumeMounts': volume_mounts
             }
-        )
-
-    def _get_init_containers(self):
-        return self.init_containers
-
-    def add_port(self, port):  # type: (Port) -> None
-        """
-        Adds a Port to the generator
-
-        :param port: ports for generated pod
-        :type port: airflow.kubernetes.pod.Port
-        """
-        self.ports.append({'name': port.name, 'containerPort': port.container_port})
-
-    def add_volume(self, volume):  # type: (Volume) -> None
-        """
-        Adds a Volume to the generator
-
-        :param volume: volume for generated pod
-        :type volume: airflow.kubernetes.volume.Volume
-        """
-
-        self._add_volume(name=volume.name, configs=volume.configs)
-
-    def _add_volume(self, name, configs):
-        """
-
-        Args:
-            name (str):
-            configs (dict): Configurations for the volume.
-            Could be used to define PersistentVolumeClaim, ConfigMap, etc...
-
-        Returns:
-
-        """
-        volume_map = {'name': name}
-        for k, v in configs.items():
-            volume_map[k] = v
-
-        self.volumes.append(volume_map)
-
-    def add_volume_with_configmap(self, name, config_map):
-        self.volumes.append(
-            {
-                'name': name,
-                'configMap': config_map
+            limits = {
+                'cpu': namespaced.get('limit_cpu'),
+                'memory': namespaced.get('limit_memory')
             }
+            all_resources = list(requests.values()) + list(limits.values())
+            if all(r is None for r in all_resources):
+                resources = None
+            else:
+                resources = k8s.V1ResourceRequirements(
+                    requests=requests,
+                    limits=limits
+                )
+
+        annotations = namespaced.get('annotations', {})
+        gcp_service_account_key = namespaced.get('gcp_service_account_key', None)
+
+        if annotations is not None and gcp_service_account_key is not None:
+            annotations.update({
+                'iam.cloud.google.com/service-account': gcp_service_account_key
+            })
+
+        pod_spec_generator = PodGenerator(
+            image=namespaced.get('image'),
+            envs=namespaced.get('env'),
+            cmds=namespaced.get('cmds'),
+            args=namespaced.get('args'),
+            labels=namespaced.get('labels'),
+            node_selectors=namespaced.get('node_selectors'),
+            name=namespaced.get('name'),
+            ports=namespaced.get('ports'),
+            volumes=namespaced.get('volumes'),
+            volume_mounts=namespaced.get('volume_mounts'),
+            namespace=namespaced.get('namespace'),
+            image_pull_policy=namespaced.get('image_pull_policy'),
+            restart_policy=namespaced.get('restart_policy'),
+            image_pull_secrets=namespaced.get('image_pull_secrets'),
+            init_containers=namespaced.get('init_containers'),
+            service_account_name=namespaced.get('service_account_name'),
+            resources=resources,
+            annotations=namespaced.get('annotations'),
+            affinity=namespaced.get('affinity'),
+            hostnetwork=namespaced.get('hostnetwork'),
+            tolerations=namespaced.get('tolerations'),
+            security_context=namespaced.get('security_context'),
+            configmaps=namespaced.get('configmaps'),
+            dnspolicy=namespaced.get('dnspolicy'),
+            pod=namespaced.get('pod'),
+            extract_xcom=namespaced.get('extract_xcom'),
         )
 
-    def _add_mount(self,
-                   name,
-                   mount_path,
-                   sub_path,
-                   read_only):
-        """
+        return pod_spec_generator.gen_pod()
 
-        Args:
-            name (str):
-            mount_path (str):
-            sub_path (str):
-            read_only:
-
-        Returns:
-
-        """
-
-        self.volume_mounts.append({
-            'name': name,
-            'mountPath': mount_path,
-            'subPath': sub_path,
-            'readOnly': read_only
-        })
-
-    def add_mount(self,
-                  volume_mount,  # type: VolumeMount
-                  ):
+    @staticmethod
+    def reconcile_pods(base_pod, client_pod):
         """
-        Adds a VolumeMount to the generator
-
-        :param volume_mount: volume for generated pod
-        :type volume_mount: airflow.kubernetes.volume_mount.VolumeMount
+        :param base_pod: has the base attributes which are overwritten if they exist
+            in the client pod and remain if they do not exist in the client_pod
+        :type base_pod: k8s.V1Pod
+        :param client_pod: the pod that the client wants to create.
+        :type client_pod: k8s.V1Pod
+        :return: the merged pods
+
+        This can't be done recursively as certain fields are preserved,
+        some overwritten, and some concatenated, e.g. The command
+        should be preserved from base, the volumes appended to and
+        the other fields overwritten.
         """
-        self._add_mount(
-            name=volume_mount.name,
-            mount_path=volume_mount.mount_path,
-            sub_path=volume_mount.sub_path,
-            read_only=volume_mount.read_only
-        )
 
-    def _get_volumes_and_mounts(self):
-        return self.volumes, self.volume_mounts
-
-    def _get_image_pull_secrets(self):
-        """Extracts any image pull secrets for fetching container(s)"""
-        if not self.kube_config.image_pull_secrets:
-            return []
-        return self.kube_config.image_pull_secrets.split(',')
-
-    def make_pod(self, namespace, image, pod_id, cmds, arguments, labels):
-        volumes, volume_mounts = self._get_volumes_and_mounts()
-        worker_init_container_spec = self._get_init_containers()
-
-        return Pod(
-            namespace=namespace,
-            name=pod_id + "-" + str(uuid.uuid4())[:8],
-            image=image,
-            cmds=cmds,
-            args=arguments,
-            labels=labels,
-            envs={},
-            secrets=[],
-            # service_account_name=self.kube_config.worker_service_account_name,
-            # image_pull_secrets=self.kube_config.image_pull_secrets,
-            init_containers=worker_init_container_spec,
-            ports=self.ports,
-            volumes=volumes,
-            volume_mounts=volume_mounts,
-            resources=None
-        )
+        client_pod_cp = copy.deepcopy(client_pod)
+
+        def merge_objects(base_obj, client_obj):
+            for base_key in base_obj.to_dict().keys():
+                base_val = getattr(base_obj, base_key, None)
+                if not getattr(client_obj, base_key, None) and base_val:
+                    setattr(client_obj, base_key, base_val)
+
+        def extend_object_field(base_obj, client_obj, field_name):
+            base_obj_field = getattr(base_obj, field_name, None)
+            client_obj_field = getattr(client_obj, field_name, None)
+            if not base_obj_field:
+                return
+            if not client_obj_field:
+                setattr(client_obj, field_name, base_obj_field)
+                return
+            appended_fields = base_obj_field + client_obj_field
+            setattr(client_obj, field_name, appended_fields)
+
+        # Values at the pod and metadata should be overwritten where they exist,
+        # but certain values at the spec and container level must be conserved.
+        base_container = base_pod.spec.containers[0]
+        client_container = client_pod_cp.spec.containers[0]
+
+        extend_object_field(base_container, client_container, 'volume_mounts')
+        extend_object_field(base_container, client_container, 'env')
+        extend_object_field(base_container, client_container, 'env_from')
+        extend_object_field(base_container, client_container, 'ports')
+        extend_object_field(base_container, client_container, 'volume_devices')
+        client_container.command = base_container.command
+        client_container.args = base_container.args
+        merge_objects(base_pod.spec.containers[0], client_pod_cp.spec.containers[0])
+        # Just append any additional containers from the base pod
+        client_pod_cp.spec.containers.extend(base_pod.spec.containers[1:])
+
+        merge_objects(base_pod.metadata, client_pod_cp.metadata)
+
+        extend_object_field(base_pod.spec, client_pod_cp.spec, 'volumes')
+        merge_objects(base_pod.spec, client_pod_cp.spec)
+        merge_objects(base_pod, client_pod_cp)
+
+        return client_pod_cp
diff --git a/airflow/kubernetes/pod_launcher.py b/airflow/kubernetes/pod_launcher.py
index 8a9028b..47d8ed5 100644
--- a/airflow/kubernetes/pod_launcher.py
+++ b/airflow/kubernetes/pod_launcher.py
@@ -18,30 +18,22 @@
 import json
 import time
 from datetime import datetime as dt
-from typing import Tuple, Optional
-
-from requests.exceptions import BaseHTTPError
 
 import tenacity
-
 from kubernetes import watch, client
 from kubernetes.client.rest import ApiException
 from kubernetes.stream import stream as kubernetes_stream
+from requests.exceptions import BaseHTTPError
 
+from airflow import AirflowException
+from airflow.kubernetes.pod_generator import PodDefaults
 from airflow.settings import pod_mutation_hook
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.state import State
-from airflow import AirflowException
-
-from airflow.kubernetes.pod import Pod
-from airflow.kubernetes.kubernetes_request_factory import \
-    pod_request_factory as pod_factory
-
 from .kube_client import get_kube_client
 
 
-class PodStatus(object):
-    """Status of the PODs"""
+class PodStatus:
     PENDING = 'pending'
     RUNNING = 'running'
     FAILED = 'failed'
@@ -68,35 +60,36 @@ class PodLauncher(LoggingMixin):
                                                       cluster_context=cluster_context)
         self._watch = watch.Watch()
         self.extract_xcom = extract_xcom
-        self.kube_req_factory = pod_factory.ExtractXcomPodRequestFactory(
-        ) if extract_xcom else pod_factory.SimplePodRequestFactory()
 
     def run_pod_async(self, pod, **kwargs):
         """Runs POD asynchronously"""
         pod_mutation_hook(pod)
 
-        req = self.kube_req_factory.create(pod)
-        self.log.debug('Pod Creation Request: \n%s', json.dumps(req, indent=2))
+        sanitized_pod = self._client.api_client.sanitize_for_serialization(pod)
+        json_pod = json.dumps(sanitized_pod, indent=2)
+
+        self.log.debug('Pod Creation Request: \n%s', json_pod)
         try:
-            resp = self._client.create_namespaced_pod(body=req, namespace=pod.namespace, **kwargs)
+            resp = self._client.create_namespaced_pod(body=sanitized_pod,
+                                                      namespace=pod.metadata.namespace, **kwargs)
             self.log.debug('Pod Creation Response: %s', resp)
-        except ApiException:
-            self.log.exception('Exception when attempting to create Namespaced Pod.')
-            raise
+        except Exception as e:
+            self.log.exception('Exception when attempting '
+                               'to create Namespaced Pod: %s', json_pod)
+            raise e
         return resp
 
     def delete_pod(self, pod):
         """Deletes POD"""
         try:
             self._client.delete_namespaced_pod(
-                pod.name, pod.namespace, body=client.V1DeleteOptions())
+                pod.metadata.name, pod.metadata.namespace, body=client.V1DeleteOptions())
         except ApiException as e:
             # If the pod is already deleted
             if e.status != 404:
                 raise
 
     def run_pod(self, pod, startup_timeout=120, get_logs=True):
-        # type: (Pod, int, bool) -> Tuple[State, Optional[str]]
         """
         Launches the pod synchronously and waits for completion.
         Args:
@@ -117,7 +110,6 @@ class PodLauncher(LoggingMixin):
         return self._monitor_pod(pod, get_logs)
 
     def _monitor_pod(self, pod, get_logs):
-        # type: (Pod, bool) -> Tuple[State, Optional[str]]
 
         if get_logs:
             logs = self.read_pod_logs(pod)
@@ -126,13 +118,13 @@ class PodLauncher(LoggingMixin):
         result = None
         if self.extract_xcom:
             while self.base_container_is_running(pod):
-                self.log.info('Container %s has state %s', pod.name, State.RUNNING)
+                self.log.info('Container %s has state %s', pod.metadata.name, State.RUNNING)
                 time.sleep(2)
             result = self._extract_xcom(pod)
             self.log.info(result)
             result = json.loads(result)
         while self.pod_is_running(pod):
-            self.log.info('Pod %s has state %s', pod.name, State.RUNNING)
+            self.log.info('Pod %s has state %s', pod.metadata.name, State.RUNNING)
             time.sleep(2)
         return self._task_status(self.read_pod(pod)), result
 
@@ -158,6 +150,8 @@ class PodLauncher(LoggingMixin):
         event = self.read_pod(pod)
         status = next(iter(filter(lambda s: s.name == 'base',
                                   event.status.container_statuses)), None)
+        if not status:
+            return False
         return status.state.running is not None
 
     @tenacity.retry(
@@ -169,8 +163,8 @@ class PodLauncher(LoggingMixin):
         """Reads log from the POD"""
         try:
             return self._client.read_namespaced_pod_log(
-                name=pod.name,
-                namespace=pod.namespace,
+                name=pod.metadata.name,
+                namespace=pod.metadata.namespace,
                 container='base',
                 follow=True,
                 tail_lines=10,
@@ -189,7 +183,7 @@ class PodLauncher(LoggingMixin):
     def read_pod(self, pod):
         """Read POD information"""
         try:
-            return self._client.read_namespaced_pod(pod.name, pod.namespace)
+            return self._client.read_namespaced_pod(pod.metadata.name, pod.metadata.namespace)
         except BaseHTTPError as e:
             raise AirflowException(
                 'There was an error reading the kubernetes API: {}'.format(e)
@@ -197,19 +191,19 @@ class PodLauncher(LoggingMixin):
 
     def _extract_xcom(self, pod):
         resp = kubernetes_stream(self._client.connect_get_namespaced_pod_exec,
-                                 pod.name, pod.namespace,
-                                 container=self.kube_req_factory.SIDECAR_CONTAINER_NAME,
+                                 pod.metadata.name, pod.metadata.namespace,
+                                 container=PodDefaults.SIDECAR_CONTAINER_NAME,
                                  command=['/bin/sh'], stdin=True, stdout=True,
                                  stderr=True, tty=False,
                                  _preload_content=False)
         try:
             result = self._exec_pod_command(
-                resp, 'cat {}/return.json'.format(self.kube_req_factory.XCOM_MOUNT_PATH))
+                resp, 'cat {}/return.json'.format(PodDefaults.XCOM_MOUNT_PATH))
             self._exec_pod_command(resp, 'kill -s SIGINT 1')
         finally:
             resp.close()
         if result is None:
-            raise AirflowException('Failed to extract xcom from pod: {}'.format(pod.name))
+            raise AirflowException('Failed to extract xcom from pod: {}'.format(pod.metadata.name))
         return result
 
     def _exec_pod_command(self, resp, command):
diff --git a/airflow/kubernetes/pod_runtime_info_env.py b/airflow/kubernetes/pod_runtime_info_env.py
index f52791e..7d23a7e 100644
--- a/airflow/kubernetes/pod_runtime_info_env.py
+++ b/airflow/kubernetes/pod_runtime_info_env.py
@@ -15,11 +15,15 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-Classes for using Kubernetes Downward API
+Classes for interacting with Kubernetes API
 """
 
+import copy
+import kubernetes.client.models as k8s
+from airflow.kubernetes.k8s_model import K8SModel
 
-class PodRuntimeInfoEnv:
+
+class PodRuntimeInfoEnv(K8SModel):
     """Defines Pod runtime information as environment variable"""
 
     def __init__(self, name, field_path):
@@ -34,3 +38,23 @@ class PodRuntimeInfoEnv:
         """
         self.name = name
         self.field_path = field_path
+
+    def to_k8s_client_obj(self):
+        """
+        :return: kubernetes.client.models.V1EnvVar
+        """
+        return k8s.V1EnvVar(
+            name=self.name,
+            value_from=k8s.V1EnvVarSource(
+                field_ref=k8s.V1ObjectFieldSelector(
+                    self.field_path
+                )
+            )
+        )
+
+    def attach_to_pod(self, pod):
+        cp_pod = copy.deepcopy(pod)
+        env = self.to_k8s_client_obj()
+        cp_pod.spec.containers[0].env = cp_pod.spec.containers[0].env or []
+        cp_pod.spec.containers[0].env.append(env)
+        return cp_pod
diff --git a/airflow/kubernetes/secret.py b/airflow/kubernetes/secret.py
index fde1ded..8591a88 100644
--- a/airflow/kubernetes/secret.py
+++ b/airflow/kubernetes/secret.py
@@ -14,10 +14,18 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+"""
+Classes for interacting with Kubernetes API
+"""
+
+import uuid
+import copy
+import kubernetes.client.models as k8s
 from airflow.exceptions import AirflowConfigException
+from airflow.kubernetes.k8s_model import K8SModel
 
 
-class Secret(object):
+class Secret(K8SModel):
     """Defines Kubernetes Secret Volume"""
 
     def __init__(self, deploy_type, deploy_target, secret, key=None):
@@ -38,6 +46,9 @@ class Secret(object):
             if not provided in `deploy_type` `env` it will mount all secrets in object
         :type key: str or None
         """
+        if deploy_type not in ('env', 'volume'):
+            raise AirflowConfigException("deploy_type must be env or volume")
+
         self.deploy_type = deploy_type
         self.deploy_target = deploy_target
 
@@ -53,6 +64,56 @@ class Secret(object):
         self.secret = secret
         self.key = key
 
+    def to_env_secret(self):
+        return k8s.V1EnvVar(
+            name=self.deploy_target,
+            value_from=k8s.V1EnvVarSource(
+                secret_key_ref=k8s.V1SecretKeySelector(
+                    name=self.secret,
+                    key=self.key
+                )
+            )
+        )
+
+    def to_env_from_secret(self):
+        return k8s.V1EnvFromSource(
+            secret_ref=k8s.V1SecretEnvSource(name=self.secret)
+        )
+
+    def to_volume_secret(self):
+        vol_id = 'secretvol{}'.format(uuid.uuid4())
+        return (
+            k8s.V1Volume(
+                name=vol_id,
+                secret=k8s.V1SecretVolumeSource(
+                    secret_name=self.secret
+                )
+            ),
+            k8s.V1VolumeMount(
+                mount_path=self.deploy_target,
+                name=vol_id,
+                read_only=True
+            )
+        )
+
+    def attach_to_pod(self, pod):
+        cp_pod = copy.deepcopy(pod)
+        if self.deploy_type == 'volume':
+            volume, volume_mount = self.to_volume_secret()
+            cp_pod.spec.volumes = pod.spec.volumes or []
+            cp_pod.spec.volumes.append(volume)
+            cp_pod.spec.containers[0].volume_mounts = pod.spec.containers[0].volume_mounts or []
+            cp_pod.spec.containers[0].volume_mounts.append(volume_mount)
+        if self.deploy_type == 'env' and self.key is not None:
+            env = self.to_env_secret()
+            cp_pod.spec.containers[0].env = cp_pod.spec.containers[0].env or []
+            cp_pod.spec.containers[0].env.append(env)
+        if self.deploy_type == 'env' and self.key is None:
+            env_from = self.to_env_from_secret()
+            cp_pod.spec.containers[0].env_from = cp_pod.spec.containers[0].env_from or []
+            cp_pod.spec.containers[0].env_from.append(env_from)
+        return cp_pod
+
     def __eq__(self, other):
         return (
             self.deploy_type == other.deploy_type and
diff --git a/airflow/kubernetes/volume.py b/airflow/kubernetes/volume.py
index 94003fe..9d85959 100644
--- a/airflow/kubernetes/volume.py
+++ b/airflow/kubernetes/volume.py
@@ -14,11 +14,15 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+"""
+Classes for interacting with Kubernetes API
+"""
 
+import copy
+from airflow.kubernetes.k8s_model import K8SModel
 
-class Volume:
-    """Defines Kubernetes Volume"""
 
+class Volume(K8SModel):
     def __init__(self, name, configs):
         """ Adds Kubernetes Volume to pod. allows pod to access features like ConfigMaps
         and Persistent Volumes
@@ -31,3 +35,15 @@ class Volume:
         """
         self.name = name
         self.configs = configs
+
+    def to_k8s_client_obj(self):
+        configs = self.configs
+        configs['name'] = self.name
+        return configs
+
+    def attach_to_pod(self, pod):
+        cp_pod = copy.deepcopy(pod)
+        volume = self.to_k8s_client_obj()
+        cp_pod.spec.volumes = pod.spec.volumes or []
+        cp_pod.spec.volumes.append(volume)
+        return cp_pod
diff --git a/airflow/kubernetes/volume_mount.py b/airflow/kubernetes/volume_mount.py
index 4bdf09c..74563b4 100644
--- a/airflow/kubernetes/volume_mount.py
+++ b/airflow/kubernetes/volume_mount.py
@@ -14,9 +14,16 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+"""
+Classes for interacting with Kubernetes API
+"""
 
+import copy
+import kubernetes.client.models as k8s
+from airflow.kubernetes.k8s_model import K8SModel
 
-class VolumeMount:
+
+class VolumeMount(K8SModel):
     """Defines Kubernetes Volume Mount"""
 
     def __init__(self, name, mount_path, sub_path, read_only):
@@ -35,3 +42,18 @@ class VolumeMount:
         self.mount_path = mount_path
         self.sub_path = sub_path
         self.read_only = read_only
+
+    def to_k8s_client_obj(self):
+        return k8s.V1VolumeMount(
+            name=self.name,
+            mount_path=self.mount_path,
+            sub_path=self.sub_path,
+            read_only=self.read_only
+        )
+
+    def attach_to_pod(self, pod):
+        cp_pod = copy.deepcopy(pod)
+        volume_mount = self.to_k8s_client_obj()
+        cp_pod.spec.containers[0].volume_mounts = pod.spec.containers[0].volume_mounts or []
+        cp_pod.spec.containers[0].volume_mounts.append(volume_mount)
+        return cp_pod
diff --git a/airflow/kubernetes/worker_configuration.py b/airflow/kubernetes/worker_configuration.py
index 74e464a..41a65b3 100644
--- a/airflow/kubernetes/worker_configuration.py
+++ b/airflow/kubernetes/worker_configuration.py
@@ -16,10 +16,13 @@
 # under the License.
 
 import os
+
+import kubernetes.client.models as k8s
 import six
 
 from airflow.configuration import conf
-from airflow.kubernetes.pod import Pod, Resources
+from airflow.kubernetes.k8s_model import append_to_pod
+from airflow.kubernetes.pod_generator import PodGenerator
 from airflow.kubernetes.secret import Secret
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.version import version as airflow_version
@@ -51,123 +54,120 @@ class WorkerConfiguration(LoggingMixin):
             return []
 
         # Otherwise, define a git-sync init container
-        init_environment = [{
-            'name': 'GIT_SYNC_REPO',
-            'value': self.kube_config.git_repo
-        }, {
-            'name': 'GIT_SYNC_BRANCH',
-            'value': self.kube_config.git_branch
-        }, {
-            'name': 'GIT_SYNC_ROOT',
-            'value': self.kube_config.git_sync_root
-        }, {
-            'name': 'GIT_SYNC_DEST',
-            'value': self.kube_config.git_sync_dest
-        }, {
-            'name': 'GIT_SYNC_DEPTH',
-            'value': '1'
-        }, {
-            'name': 'GIT_SYNC_ONE_TIME',
-            'value': 'true'
-        }, {
-            'name': 'GIT_SYNC_REV',
-            'value': self.kube_config.git_sync_rev
-        }]
-
+        init_environment = [k8s.V1EnvVar(
+            name='GIT_SYNC_REPO',
+            value=self.kube_config.git_repo
+        ), k8s.V1EnvVar(
+            name='GIT_SYNC_BRANCH',
+            value=self.kube_config.git_branch
+        ), k8s.V1EnvVar(
+            name='GIT_SYNC_ROOT',
+            value=self.kube_config.git_sync_root
+        ), k8s.V1EnvVar(
+            name='GIT_SYNC_DEST',
+            value=self.kube_config.git_sync_dest
+        ), k8s.V1EnvVar(
+            name='GIT_SYNC_DEPTH',
+            value='1'
+        ), k8s.V1EnvVar(
+            name='GIT_SYNC_ONE_TIME',
+            value='true'
+        ), k8s.V1EnvVar(
+            name='GIT_SYNC_REV',
+            value=self.kube_config.git_sync_rev
+        )]
         for env_var_name, env_var_val in six.iteritems(self.kube_config.kube_env_vars):
-            init_environment.append({
-                'name': env_var_name,
-                'value': env_var_val
-            })
-
+            init_environment.append(k8s.V1EnvVar(
+                name=env_var_name,
+                value=env_var_val
+            ))
         if self.kube_config.git_user:
-            init_environment.append({
-                'name': 'GIT_SYNC_USERNAME',
-                'value': self.kube_config.git_user
-            })
+            init_environment.append(k8s.V1EnvVar(
+                name='GIT_SYNC_USERNAME',
+                value=self.kube_config.git_user
+            ))
         if self.kube_config.git_password:
-            init_environment.append({
-                'name': 'GIT_SYNC_PASSWORD',
-                'value': self.kube_config.git_password
-            })
+            init_environment.append(k8s.V1EnvVar(
+                name='GIT_SYNC_PASSWORD',
+                value=self.kube_config.git_password
+            ))
+
+        volume_mounts = [k8s.V1VolumeMount(
+            mount_path=self.kube_config.git_sync_root,
+            name=self.dags_volume_name,
+            read_only=False
+        )]
 
         if self.kube_config.git_sync_credentials_secret:
             init_environment.extend([
-                {
-                    'name': 'GIT_SYNC_USERNAME',
-                    'valueFrom': {
-                        'secretKeyRef': {
-                            'name': self.kube_config.git_sync_credentials_secret,
-                            'key': 'GIT_SYNC_USERNAME'
-                        }
-                    }
-                },
-                {
-                    'name': 'GIT_SYNC_PASSWORD',
-                    'valueFrom': {
-                        'secretKeyRef': {
-                            'name': self.kube_config.git_sync_credentials_secret,
-                            'key': 'GIT_SYNC_PASSWORD'
-                        }
-                    }
-                }
+                k8s.V1EnvVar(
+                    name='GIT_SYNC_USERNAME',
+                    value_from=k8s.V1EnvVarSource(
+                        secret_key_ref=k8s.V1SecretKeySelector(
+                            name=self.kube_config.git_sync_credentials_secret,
+                            key='GIT_SYNC_USERNAME')
+                    )
+                ),
+                k8s.V1EnvVar(
+                    name='GIT_SYNC_PASSWORD',
+                    value_from=k8s.V1EnvVarSource(
+                        secret_key_ref=k8s.V1SecretKeySelector(
+                            name=self.kube_config.git_sync_credentials_secret,
+                            key='GIT_SYNC_PASSWORD')
+                    )
+                )
             ])
 
-        volume_mounts = [{
-            'mountPath': self.kube_config.git_sync_root,
-            'name': self.dags_volume_name,
-            'readOnly': False
-        }]
         if self.kube_config.git_ssh_key_secret_name:
-            volume_mounts.append({
-                'name': self.git_sync_ssh_secret_volume_name,
-                'mountPath': '/etc/git-secret/ssh',
-                'subPath': 'ssh'
-            })
-            init_environment.extend([
-                {
-                    'name': 'GIT_SSH_KEY_FILE',
-                    'value': '/etc/git-secret/ssh'
-                },
-                {
-                    'name': 'GIT_SYNC_SSH',
-                    'value': 'true'
-                }])
-        if self.kube_config.git_ssh_known_hosts_configmap_name:
-            volume_mounts.append({
-                'name': self.git_sync_ssh_known_hosts_volume_name,
-                'mountPath': '/etc/git-secret/known_hosts',
-                'subPath': 'known_hosts'
-            })
+            volume_mounts.append(k8s.V1VolumeMount(
+                name=self.git_sync_ssh_secret_volume_name,
+                mount_path='/etc/git-secret/ssh',
+                sub_path='ssh'
+            ))
+
             init_environment.extend([
-                {
-                    'name': 'GIT_KNOWN_HOSTS',
-                    'value': 'true'
-                },
-                {
-                    'name': 'GIT_SSH_KNOWN_HOSTS_FILE',
-                    'value': '/etc/git-secret/known_hosts'
-                }
+                k8s.V1EnvVar(
+                    name='GIT_SSH_KEY_FILE',
+                    value='/etc/git-secret/ssh'
+                ),
+                k8s.V1EnvVar(
+                    name='GIT_SYNC_SSH',
+                    value='true'
+                )
             ])
+
+        if self.kube_config.git_ssh_known_hosts_configmap_name:
+            volume_mounts.append(k8s.V1VolumeMount(
+                name=self.git_sync_ssh_known_hosts_volume_name,
+                mount_path='/etc/git-secret/known_hosts',
+                sub_path='known_hosts'
+            ))
+            init_environment.extend([k8s.V1EnvVar(
+                name='GIT_KNOWN_HOSTS',
+                value='true'
+            ), k8s.V1EnvVar(
+                name='GIT_SSH_KNOWN_HOSTS_FILE',
+                value='/etc/git-secret/known_hosts'
+            )])
         else:
-            init_environment.append({
-                'name': 'GIT_KNOWN_HOSTS',
-                'value': 'false'
-            })
-
-        init_containers = [{
-            'name': self.kube_config.git_sync_init_container_name,
-            'image': self.kube_config.git_sync_container,
-            'env': init_environment,
-            'volumeMounts': volume_mounts
-        }]
+            init_environment.append(k8s.V1EnvVar(
+                name='GIT_KNOWN_HOSTS',
+                value='false'
+            ))
+
+        init_containers = k8s.V1Container(
+            name=self.kube_config.git_sync_init_container_name,
+            image=self.kube_config.git_sync_container,
+            env=init_environment,
+            volume_mounts=volume_mounts
+        )
 
         if self.kube_config.git_sync_run_as_user != "":
-            init_containers[0]['securityContext'] = {
-                'runAsUser': self.kube_config.git_sync_run_as_user  # git-sync user
-            }
+            init_containers.security_context = k8s.V1SecurityContext(
+                run_as_user=self.kube_config.git_sync_run_as_user or 65533
+            )  # git-sync user
 
-        return init_containers
+        return [init_containers]
 
     def _get_environment(self):
         """Defines any necessary environment variables for the pod executor"""
@@ -196,9 +196,39 @@ class WorkerConfiguration(LoggingMixin):
 
     def _get_configmaps(self):
         """Extracts any configmapRefs to envFrom"""
-        if not self.kube_config.env_from_configmap_ref:
-            return []
-        return self.kube_config.env_from_configmap_ref.split(',')
+        env_from = []
+
+        if self.kube_config.env_from_configmap_ref:
+            for config_map_ref in self.kube_config.env_from_configmap_ref.split(','):
+                env_from.append(
+                    k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(config_map_ref))
+                )
+
+        if self.kube_config.env_from_secret_ref:
+            for secret_ref in self.kube_config.env_from_secret_ref.split(','):
+                env_from.append(
+                    k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(secret_ref))
+                )
+
+        return env_from
+
+    def _get_env_from(self):
+        """Extracts any configmapRefs to envFrom"""
+        env_from = []
+
+        if self.kube_config.env_from_configmap_ref:
+            for config_map_ref in self.kube_config.env_from_configmap_ref.split(','):
+                env_from.append(
+                    k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(config_map_ref))
+                )
+
+        if self.kube_config.env_from_secret_ref:
+            for secret_ref in self.kube_config.env_from_secret_ref.split(','):
+                env_from.append(
+                    k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(secret_ref))
+                )
+
+        return env_from
 
     def _get_secrets(self):
         """Defines any necessary secrets for the pod executor"""
@@ -222,21 +252,23 @@ class WorkerConfiguration(LoggingMixin):
         """Extracts any image pull secrets for fetching container(s)"""
         if not self.kube_config.image_pull_secrets:
             return []
-        return self.kube_config.image_pull_secrets.split(',')
+        pull_secrets = self.kube_config.image_pull_secrets.split(',')
+        return list(map(lambda name: k8s.V1LocalObjectReference(name), pull_secrets))
 
     def _get_security_context(self):
         """Defines the security context"""
-        security_context = {}
+
+        security_context = k8s.V1PodSecurityContext()
 
         if self.kube_config.worker_run_as_user != "":
-            security_context['runAsUser'] = self.kube_config.worker_run_as_user
+            security_context.run_as_user = self.kube_config.worker_run_as_user
 
         if self.kube_config.worker_fs_group != "":
-            security_context['fsGroup'] = self.kube_config.worker_fs_group
+            security_context.fs_group = self.kube_config.worker_fs_group
 
         # set fs_group to 65533 if not explicitly specified and using git ssh keypair auth
-        if self.kube_config.git_ssh_key_secret_name and security_context.get('fsGroup') is None:
-            security_context['fsGroup'] = 65533
+        if self.kube_config.git_ssh_key_secret_name and security_context.fs_group is None:
+            security_context.fs_group = 65533
 
         return security_context
 
@@ -246,22 +278,77 @@ class WorkerConfiguration(LoggingMixin):
         copy.update(labels)
         return copy
 
-    def _get_volumes_and_mounts(self):
+    def _get_volume_mounts(self):
+        volume_mounts = {
+            self.dags_volume_name: k8s.V1VolumeMount(
+                name=self.dags_volume_name,
+                mount_path=self.generate_dag_volume_mount_path(),
+                read_only=True,
+            ),
+            self.logs_volume_name: k8s.V1VolumeMount(
+                name=self.logs_volume_name,
+                mount_path=self.worker_airflow_logs,
+            )
+        }
+
+        if self.kube_config.dags_volume_subpath:
+            volume_mounts[self.dags_volume_name].sub_path = self.kube_config.dags_volume_subpath
+
+        if self.kube_config.logs_volume_subpath:
+            volume_mounts[self.logs_volume_name].sub_path = self.kube_config.logs_volume_subpath
+
+        if self.kube_config.dags_in_image:
+            del volume_mounts[self.dags_volume_name]
+
+        # Mount the airflow.cfg file via a configmap the user has specified
+        if self.kube_config.airflow_configmap:
+            config_volume_name = 'airflow-config'
+            config_path = '{}/airflow.cfg'.format(self.worker_airflow_home)
+            volume_mounts[config_volume_name] = k8s.V1VolumeMount(
+                name=config_volume_name,
+                mount_path=config_path,
+                sub_path='airflow.cfg',
+                read_only=True
+            )
+        if self.kube_config.airflow_local_settings_configmap:
+            config_path = '{}/config/airflow_local_settings.py'.format(self.worker_airflow_home)
+
+            if self.kube_config.airflow_local_settings_configmap != self.kube_config.airflow_configmap:
+                config_volume_name = 'airflow-local-settings'
+
+                volume_mounts[config_volume_name] = k8s.V1VolumeMount(
+                    name=config_volume_name,
+                    mount_path=config_path,
+                    sub_path='airflow_local_settings.cfg',
+                    read_only=True
+                )
+
+            else:
+                volume_mounts['airflow-local-settings'] = k8s.V1VolumeMount(
+                    name='airflow-config',
+                    mount_path=config_path,
+                    sub_path='airflow_local_settings.cfg',
+                    read_only=True
+                )
+
+        return list(volume_mounts.values())
+
+    def _get_volumes(self):
         def _construct_volume(name, claim, host):
-            volume = {
-                'name': name
-            }
+            volume = k8s.V1Volume(name=name)
+
             if claim:
-                volume['persistentVolumeClaim'] = {
-                    'claimName': claim
-                }
+                volume.persistent_volume_claim = k8s.V1PersistentVolumeClaimVolumeSource(
+                    claim_name=claim
+                )
             elif host:
-                volume['hostPath'] = {
-                    'path': host,
-                    'type': ''
-                }
+                volume.host_path = k8s.V1HostPathVolumeSource(
+                    path=host,
+                    type=''
+                )
             else:
-                volume['emptyDir'] = {}
+                volume.empty_dir = {}
+
             return volume
 
         volumes = {
@@ -277,135 +364,68 @@ class WorkerConfiguration(LoggingMixin):
             )
         }
 
-        volume_mounts = {
-            self.dags_volume_name: {
-                'name': self.dags_volume_name,
-                'mountPath': self.generate_dag_volume_mount_path(),
-                'readOnly': True,
-            },
-            self.logs_volume_name: {
-                'name': self.logs_volume_name,
-                'mountPath': self.worker_airflow_logs,
-            }
-        }
-
-        if self.kube_config.dags_volume_subpath:
-            volume_mounts[self.dags_volume_name]['subPath'] = self.kube_config.dags_volume_subpath
-
-        if self.kube_config.logs_volume_subpath:
-            volume_mounts[self.logs_volume_name]['subPath'] = self.kube_config.logs_volume_subpath
-
         if self.kube_config.dags_in_image:
             del volumes[self.dags_volume_name]
-            del volume_mounts[self.dags_volume_name]
 
         # Get the SSH key from secrets as a volume
         if self.kube_config.git_ssh_key_secret_name:
-            volumes[self.git_sync_ssh_secret_volume_name] = {
-                'name': self.git_sync_ssh_secret_volume_name,
-                'secret': {
-                    'secretName': self.kube_config.git_ssh_key_secret_name,
-                    'items': [{
-                        'key': self.git_ssh_key_secret_key,
-                        'path': 'ssh',
-                        'mode': 0o440
-                    }]
-                }
-            }
+            volumes[self.git_sync_ssh_secret_volume_name] = k8s.V1Volume(
+                name=self.git_sync_ssh_secret_volume_name,
+                secret=k8s.V1SecretVolumeSource(
+                    secret_name=self.kube_config.git_ssh_key_secret_name,
+                    items=[k8s.V1KeyToPath(
+                        key=self.git_ssh_key_secret_key,
+                        path='ssh',
+                        mode=0o440
+                    )]
+                )
+            )
 
         if self.kube_config.git_ssh_known_hosts_configmap_name:
-            volumes[self.git_sync_ssh_known_hosts_volume_name] = {
-                'name': self.git_sync_ssh_known_hosts_volume_name,
-                'configMap': {
-                    'name': self.kube_config.git_ssh_known_hosts_configmap_name
-                },
-                'mode': 0o440
-            }
+            volumes[self.git_sync_ssh_known_hosts_volume_name] = k8s.V1Volume(
+                name=self.git_sync_ssh_known_hosts_volume_name,
+                config_map=k8s.V1ConfigMapVolumeSource(
+                    name=self.kube_config.git_ssh_known_hosts_configmap_name,
+                    default_mode=0o440
+                )
+            )
 
-        if self.kube_config.airflow_local_settings_configmap:
-            config_path = '{}/config/airflow_local_settings.py'.format(self.worker_airflow_home)
+        # Mount the airflow.cfg file via a configmap the user has specified
+        if self.kube_config.airflow_configmap:
+            config_volume_name = 'airflow-config'
+            volumes[config_volume_name] = k8s.V1Volume(
+                name=config_volume_name,
+                config_map=k8s.V1ConfigMapVolumeSource(
+                    name=self.kube_config.airflow_configmap
+                )
+            )
 
+        if self.kube_config.airflow_local_settings_configmap:
             if self.kube_config.airflow_local_settings_configmap != self.kube_config.airflow_configmap:
                 config_volume_name = 'airflow-local-settings'
-                volumes[config_volume_name] = {
-                    'name': config_volume_name,
-                    'configMap': {
-                        'name': self.kube_config.airflow_local_settings_configmap
-                    }
-                }
-
-                volume_mounts[config_volume_name] = {
-                    'name': config_volume_name,
-                    'mountPath': config_path,
-                    'subPath': 'airflow_local_settings.py',
-                    'readOnly': True
-                }
+                volumes[config_volume_name] = k8s.V1Volume(
+                    name=config_volume_name,
+                    config_map=k8s.V1ConfigMapVolumeSource(
+                        name=self.kube_config.airflow_local_settings_configmap
+                    )
+                )
 
-            else:
-                volume_mounts['airflow-local-settings'] = {
-                    'name': 'airflow-config',
-                    'mountPath': config_path,
-                    'subPath': 'airflow_local_settings.py',
-                    'readOnly': True
-                }
-
-        # Mount the airflow.cfg file via a configmap the user has specified
-        if self.kube_config.airflow_configmap:
-            config_volume_name = 'airflow-config'
-            config_path = '{}/airflow.cfg'.format(self.worker_airflow_home)
-            volumes[config_volume_name] = {
-                'name': config_volume_name,
-                'configMap': {
-                    'name': self.kube_config.airflow_configmap
-                }
-            }
-            volume_mounts[config_volume_name] = {
-                'name': config_volume_name,
-                'mountPath': config_path,
-                'subPath': 'airflow.cfg',
-                'readOnly': True
-            }
-
-        return volumes, volume_mounts
+        return list(volumes.values())
 
     def generate_dag_volume_mount_path(self):
         if self.kube_config.dags_volume_claim or self.kube_config.dags_volume_host:
-            dag_volume_mount_path = self.worker_airflow_dags
-        else:
-            dag_volume_mount_path = self.kube_config.git_dags_folder_mount_point
+            return self.worker_airflow_dags
 
-        return dag_volume_mount_path
+        return self.kube_config.git_dags_folder_mount_point
 
     def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_date,
-                 try_number, airflow_command, kube_executor_config):
-        volumes_dict, volume_mounts_dict = self._get_volumes_and_mounts()
-        worker_init_container_spec = self._get_init_containers()
-        resources = Resources(
-            request_memory=kube_executor_config.request_memory,
-            request_cpu=kube_executor_config.request_cpu,
-            limit_memory=kube_executor_config.limit_memory,
-            limit_cpu=kube_executor_config.limit_cpu,
-            limit_gpu=kube_executor_config.limit_gpu
-        )
-        gcp_sa_key = kube_executor_config.gcp_service_account_key
-        annotations = dict(kube_executor_config.annotations) or self.kube_config.kube_annotations
-        if gcp_sa_key:
-            annotations['iam.cloud.google.com/service-account'] = gcp_sa_key
-
-        volumes = [value for value in volumes_dict.values()] + kube_executor_config.volumes
-        volume_mounts = [value for value in volume_mounts_dict.values()] + kube_executor_config.volume_mounts
-
-        affinity = kube_executor_config.affinity or self.kube_config.kube_affinity
-        tolerations = kube_executor_config.tolerations or self.kube_config.kube_tolerations
-
-        return Pod(
+                 try_number, airflow_command):
+        pod_generator = PodGenerator(
             namespace=namespace,
             name=pod_id,
-            image=kube_executor_config.image or self.kube_config.kube_image,
-            image_pull_policy=(kube_executor_config.image_pull_policy or
-                               self.kube_config.kube_image_pull_policy),
-            cmds=airflow_command,
-            labels=self._get_labels(kube_executor_config.labels, {
+            image=self.kube_config.kube_image,
+            image_pull_policy=self.kube_config.kube_image_pull_policy,
+            labels={
                 'airflow-worker': worker_uuid,
                 'dag_id': dag_id,
                 'task_id': task_id,
@@ -413,20 +433,22 @@ class WorkerConfiguration(LoggingMixin):
                 'try_number': str(try_number),
                 'airflow_version': airflow_version.replace('+', '-'),
                 'kubernetes_executor': 'True',
-            }),
+            },
+            cmds=airflow_command,
+            volumes=self._get_volumes(),
+            volume_mounts=self._get_volume_mounts(),
+            init_containers=self._get_init_containers(),
+            annotations=self.kube_config.kube_annotations,
+            affinity=self.kube_config.kube_affinity,
+            tolerations=self.kube_config.kube_tolerations,
             envs=self._get_environment(),
-            secrets=self._get_secrets(),
+            node_selectors=self.kube_config.kube_node_selectors,
             service_account_name=self.kube_config.worker_service_account_name,
-            image_pull_secrets=self.kube_config.image_pull_secrets,
-            init_containers=worker_init_container_spec,
-            volumes=volumes,
-            volume_mounts=volume_mounts,
-            resources=resources,
-            annotations=annotations,
-            node_selectors=(kube_executor_config.node_selectors or
-                            self.kube_config.kube_node_selectors),
-            affinity=affinity,
-            tolerations=tolerations,
-            security_context=self._get_security_context(),
-            configmaps=self._get_configmaps()
         )
+
+        pod = pod_generator.gen_pod()
+        pod.spec.containers[0].env_from = pod.spec.containers[0].env_from or []
+        pod.spec.containers[0].env_from.extend(self._get_env_from())
+        pod.spec.security_context = self._get_security_context()
+
+        return append_to_pod(pod, self._get_secrets())
diff --git a/airflow/settings.py b/airflow/settings.py
index 6f82e5d..bf2fb1d 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -182,8 +182,9 @@ def policy(task_instance):
 
 def pod_mutation_hook(pod):
     """
-    This setting allows altering ``Pod`` objects before they are passed to
-    the Kubernetes client by the ``PodLauncher`` for scheduling.
+    This setting allows altering ``kubernetes.client.models.V1Pod`` object
+    before they are passed to the Kubernetes client by the ``PodLauncher``
+    for scheduling.
 
     To define a pod mutation hook, add a ``airflow_local_settings`` module
     to your PYTHONPATH that defines this ``pod_mutation_hook`` function.
diff --git a/scripts/ci/in_container/kubernetes/app/deploy_app.sh b/scripts/ci/in_container/kubernetes/app/deploy_app.sh
index 6ebc7b0..1f7832d 100755
--- a/scripts/ci/in_container/kubernetes/app/deploy_app.sh
+++ b/scripts/ci/in_container/kubernetes/app/deploy_app.sh
@@ -104,13 +104,17 @@ cat "${BUILD_DIRNAME}/configmaps.yaml"
 kubectl delete -f "${MY_DIR}/postgres.yaml" || true
 kubectl delete -f "${BUILD_DIRNAME}/airflow.yaml" || true
 kubectl delete -f "${MY_DIR}/secrets.yaml" || true
+kubectl apply -f "${MY_DIR}/secrets.yaml" -n test-namespace
 
 set -e
 
 kubectl apply -f "${MY_DIR}/secrets.yaml"
+kubectl apply -f "${MY_DIR}/secrets.yaml" -n test-namespace
 kubectl apply -f "${BUILD_DIRNAME}/configmaps.yaml"
+kubectl apply -f "${BUILD_DIRNAME}/configmaps.yaml" -n test-namespace
 kubectl apply -f "${MY_DIR}/postgres.yaml"
 kubectl apply -f "${MY_DIR}/volumes.yaml"
+kubectl apply -f "${MY_DIR}/volumes.yaml" -n test-namespace
 
 set +x
 set +o pipefail
diff --git a/scripts/ci/in_container/kubernetes/app/secrets.yaml b/scripts/ci/in_container/kubernetes/app/secrets.yaml
index fa8ae16..34571ed 100644
--- a/scripts/ci/in_container/kubernetes/app/secrets.yaml
+++ b/scripts/ci/in_container/kubernetes/app/secrets.yaml
@@ -22,5 +22,5 @@ metadata:
 type: Opaque
 data:
   # The sql_alchemy_conn value is a base64 encoded representation of this connection string:
-  # postgresql+psycopg2://root:root@postgres-airflow:5432/airflow
-  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6cm9vdEBwb3N0Z3Jlcy1haXJmbG93OjU0MzIvYWlyZmxvdwo=
+  # postgresql+psycopg2://root:root@postgres-airflow.default.svc.cluster.local:5432/airflow
+  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6cm9vdEBwb3N0Z3Jlcy1haXJmbG93LmRlZmF1bHQuc3ZjLmNsdXN0ZXIubG9jYWw6NTQzMi9haXJmbG93 # yamllint disable-line
diff --git a/scripts/ci/in_container/kubernetes/app/volumes.yaml b/scripts/ci/in_container/kubernetes/app/volumes.yaml
index 5d366a1..0e3af06 100644
--- a/scripts/ci/in_container/kubernetes/app/volumes.yaml
+++ b/scripts/ci/in_container/kubernetes/app/volumes.yaml
@@ -21,7 +21,7 @@ metadata:
   name: airflow-dags
 spec:
   accessModes:
-    - ReadWriteOnce
+    - ReadWriteMany
   capacity:
     storage: 2Gi
   hostPath:
@@ -36,7 +36,7 @@ spec:
     - ReadWriteMany
   resources:
     requests:
-      storage: 2Gi
+      storage: 2G
 ---
 kind: PersistentVolume
 apiVersion: v1
diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index 95333fc..9073493 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -16,19 +16,16 @@
 # under the License.
 #
 
-import uuid
+import random
 import re
 import string
-import random
-
-from airflow.utils import timezone
-from urllib3 import HTTPResponse
 from datetime import datetime
 
 import six
+from urllib3 import HTTPResponse
 
+from airflow.utils import timezone
 from tests.compat import mock
-from tests.test_utils.config import conf_vars
 
 try:
     from kubernetes.client.rest import ApiException
@@ -36,15 +33,8 @@ try:
     from airflow.configuration import conf  # noqa: F401
     from airflow.executors.kubernetes_executor import AirflowKubernetesScheduler
     from airflow.executors.kubernetes_executor import KubernetesExecutor
-    from airflow.executors.kubernetes_executor import KubeConfig
-    from airflow.executors.kubernetes_executor import KubernetesExecutorConfig
-    from airflow.kubernetes.worker_configuration import WorkerConfiguration
-    from airflow.exceptions import AirflowConfigException
     from airflow.utils.state import State
-    from airflow.version import version as airflow_version
-    from airflow.kubernetes.secret import Secret
-except ImportError as e:
-    print(e)
+except ImportError:
     AirflowKubernetesScheduler = None  # type: ignore
 
 if six.PY2:
@@ -130,852 +120,6 @@ class TestAirflowKubernetesScheduler(unittest.TestCase):
         self.assertEqual(datetime_obj, new_datetime_obj)
 
 
-class TestKubernetesWorkerConfiguration(unittest.TestCase):
-    """
-    Tests that if dags_volume_subpath/logs_volume_subpath configuration
-    options are passed to worker pod config
-    """
-
-    affinity_config = {
-        'podAntiAffinity': {
-            'requiredDuringSchedulingIgnoredDuringExecution': [
-                {
-                    'topologyKey': 'kubernetes.io/hostname',
-                    'labelSelector': {
-                        'matchExpressions': [
-                            {
-                                'key': 'app',
-                                'operator': 'In',
-                                'values': ['airflow']
-                            }
-                        ]
-                    }
-                }
-            ]
-        }
-    }
-
-    tolerations_config = [
-        {
-            'key': 'dedicated',
-            'operator': 'Equal',
-            'value': 'airflow'
-        },
-        {
-            'key': 'prod',
-            'operator': 'Exists'
-        }
-    ]
-
-    def setUp(self):
-        if AirflowKubernetesScheduler is None:
-            self.skipTest("kubernetes python package is not installed")
-
-        self.resources = mock.patch(
-            'airflow.kubernetes.worker_configuration.Resources'
-        )
-
-        for patcher in [self.resources]:
-            self.mock_foo = patcher.start()
-            self.addCleanup(patcher.stop)
-
-        self.kube_config = mock.MagicMock()
-        self.kube_config.airflow_home = '/'
-        self.kube_config.airflow_dags = 'dags'
-        self.kube_config.airflow_dags = 'logs'
-        self.kube_config.dags_volume_subpath = None
-        self.kube_config.logs_volume_subpath = None
-        self.kube_config.dags_in_image = False
-        self.kube_config.dags_folder = None
-        self.kube_config.git_dags_folder_mount_point = None
-        self.kube_config.kube_labels = {'dag_id': 'original_dag_id', 'my_label': 'label_id'}
-
-    def tearDown(self):
-        self.kube_config = None
-
-    def test_worker_configuration_no_subpaths(self):
-        worker_config = WorkerConfiguration(self.kube_config)
-        volumes, volume_mounts = worker_config._get_volumes_and_mounts()
-        volumes_list = [value for value in volumes.values()]
-        volume_mounts_list = [value for value in volume_mounts.values()]
-        for volume_or_mount in volumes_list + volume_mounts_list:
-            if volume_or_mount['name'] not in ['airflow-config', 'airflow-local-settings']:
-                self.assertNotIn(
-                    'subPath', volume_or_mount,
-                    "subPath shouldn't be defined"
-                )
-
-    @conf_vars({
-        ('kubernetes', 'git_ssh_known_hosts_configmap_name'): 'airflow-configmap',
-        ('kubernetes', 'git_ssh_key_secret_name'): 'airflow-secrets',
-        ('kubernetes', 'git_user'): 'some-user',
-        ('kubernetes', 'git_password'): 'some-password',
-        ('kubernetes', 'git_repo'): 'git@github.com:apache/airflow.git',
-        ('kubernetes', 'git_branch'): 'master',
-        ('kubernetes', 'git_dags_folder_mount_point'): '/usr/local/airflow/dags',
-        ('kubernetes', 'delete_worker_pods'): 'True',
-        ('kubernetes', 'kube_client_request_args'): '{"_request_timeout" : [60,360]}',
-    })
-    def test_worker_configuration_auth_both_ssh_and_user(self):
-        with self.assertRaisesRegex(AirflowConfigException,
-                                    'either `git_user` and `git_password`.*'
-                                    'or `git_ssh_key_secret_name`.*'
-                                    'but not both$'):
-            KubeConfig()
-
-    def test_worker_with_subpaths(self):
-        self.kube_config.dags_volume_subpath = 'dags'
-        self.kube_config.logs_volume_subpath = 'logs'
-        worker_config = WorkerConfiguration(self.kube_config)
-        volumes, volume_mounts = worker_config._get_volumes_and_mounts()
-
-        for volume in [value for value in volumes.values()]:
-            self.assertNotIn(
-                'subPath', volume,
-                "subPath isn't valid configuration for a volume"
-            )
-
-        for volume_mount in [value for value in volume_mounts.values()]:
-            if volume_mount['name'] != 'airflow-config':
-                self.assertIn(
-                    'subPath', volume_mount,
-                    "subPath should've been passed to volumeMount configuration"
-                )
-
-    def test_worker_generate_dag_volume_mount_path(self):
-        self.kube_config.git_dags_folder_mount_point = '/root/airflow/git/dags'
-        self.kube_config.dags_folder = '/root/airflow/dags'
-        worker_config = WorkerConfiguration(self.kube_config)
-
-        self.kube_config.dags_volume_claim = 'airflow-dags'
-        self.kube_config.dags_volume_host = ''
-        dag_volume_mount_path = worker_config.generate_dag_volume_mount_path()
-        self.assertEqual(dag_volume_mount_path, self.kube_config.dags_folder)
-
-        self.kube_config.dags_volume_claim = ''
-        self.kube_config.dags_volume_host = '/host/airflow/dags'
-        dag_volume_mount_path = worker_config.generate_dag_volume_mount_path()
-        self.assertEqual(dag_volume_mount_path, self.kube_config.dags_folder)
-
-        self.kube_config.dags_volume_claim = ''
-        self.kube_config.dags_volume_host = ''
-        dag_volume_mount_path = worker_config.generate_dag_volume_mount_path()
-        self.assertEqual(dag_volume_mount_path,
-                         self.kube_config.git_dags_folder_mount_point)
-
-    def test_worker_environment_no_dags_folder(self):
-        self.kube_config.airflow_configmap = ''
-        self.kube_config.git_dags_folder_mount_point = ''
-        self.kube_config.dags_folder = ''
-        worker_config = WorkerConfiguration(self.kube_config)
-        env = worker_config._get_environment()
-
-        self.assertNotIn('AIRFLOW__CORE__DAGS_FOLDER', env)
-
-    def test_worker_environment_when_dags_folder_specified(self):
-        self.kube_config.airflow_configmap = 'airflow-configmap'
-        self.kube_config.git_dags_folder_mount_point = ''
-        dags_folder = '/workers/path/to/dags'
-        self.kube_config.dags_folder = dags_folder
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        env = worker_config._get_environment()
-
-        self.assertEqual(dags_folder, env['AIRFLOW__CORE__DAGS_FOLDER'])
-
-    def test_worker_environment_dags_folder_using_git_sync(self):
-        self.kube_config.airflow_configmap = 'airflow-configmap'
-        self.kube_config.git_sync_dest = 'repo'
-        self.kube_config.git_subpath = 'dags'
-        self.kube_config.git_dags_folder_mount_point = '/workers/path/to/dags'
-
-        dags_folder = '{}/{}/{}'.format(self.kube_config.git_dags_folder_mount_point,
-                                        self.kube_config.git_sync_dest,
-                                        self.kube_config.git_subpath)
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        env = worker_config._get_environment()
-
-        self.assertEqual(dags_folder, env['AIRFLOW__CORE__DAGS_FOLDER'])
-
-    def test_init_environment_using_git_sync_ssh_without_known_hosts(self):
-        # Tests the init environment created with git-sync SSH authentication option is correct
-        # without known hosts file
-        self.kube_config.airflow_configmap = 'airflow-configmap'
-        self.kube_config.git_ssh_secret_name = 'airflow-secrets'
-        self.kube_config.git_ssh_known_hosts_configmap_name = None
-        self.kube_config.dags_volume_claim = None
-        self.kube_config.dags_volume_host = None
-        self.kube_config.dags_in_image = None
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        init_containers = worker_config._get_init_containers()
-
-        self.assertTrue(init_containers)  # check not empty
-        env = init_containers[0]['env']
-
-        self.assertTrue({'name': 'GIT_SSH_KEY_FILE', 'value': '/etc/git-secret/ssh'} in env)
-        self.assertTrue({'name': 'GIT_KNOWN_HOSTS', 'value': 'false'} in env)
-        self.assertTrue({'name': 'GIT_SYNC_SSH', 'value': 'true'} in env)
-
-    def test_init_environment_using_git_sync_ssh_with_known_hosts(self):
-        # Tests the init environment created with git-sync SSH authentication option is correct
-        # with known hosts file
-        self.kube_config.airflow_configmap = 'airflow-configmap'
-        self.kube_config.git_ssh_key_secret_name = 'airflow-secrets'
-        self.kube_config.git_ssh_known_hosts_configmap_name = 'airflow-configmap'
-        self.kube_config.dags_volume_claim = None
-        self.kube_config.dags_volume_host = None
-        self.kube_config.dags_in_image = None
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        init_containers = worker_config._get_init_containers()
-
-        self.assertTrue(init_containers)  # check not empty
-        env = init_containers[0]['env']
-
-        self.assertTrue({'name': 'GIT_SSH_KEY_FILE', 'value': '/etc/git-secret/ssh'} in env)
-        self.assertTrue({'name': 'GIT_KNOWN_HOSTS', 'value': 'true'} in env)
-        self.assertTrue({'name': 'GIT_SSH_KNOWN_HOSTS_FILE',
-                         'value': '/etc/git-secret/known_hosts'} in env)
-        self.assertTrue({'name': 'GIT_SYNC_SSH', 'value': 'true'} in env)
-
-    def test_init_environment_using_git_sync_user_without_known_hosts(self):
-        # Tests the init environment created with git-sync User authentication option is correct
-        # without known hosts file
-        self.kube_config.airflow_configmap = 'airflow-configmap'
-        self.kube_config.git_user = 'git_user'
-        self.kube_config.git_password = 'git_password'
-        self.kube_config.git_ssh_known_hosts_configmap_name = None
-        self.kube_config.git_ssh_key_secret_name = None
-        self.kube_config.dags_volume_claim = None
-        self.kube_config.dags_volume_host = None
-        self.kube_config.dags_in_image = None
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        init_containers = worker_config._get_init_containers()
-
-        self.assertTrue(init_containers)  # check not empty
-        env = init_containers[0]['env']
-
-        self.assertFalse({'name': 'GIT_SSH_KEY_FILE', 'value': '/etc/git-secret/ssh'} in env)
-        self.assertTrue({'name': 'GIT_SYNC_USERNAME', 'value': 'git_user'} in env)
-        self.assertTrue({'name': 'GIT_SYNC_PASSWORD', 'value': 'git_password'} in env)
-        self.assertTrue({'name': 'GIT_KNOWN_HOSTS', 'value': 'false'} in env)
-        self.assertFalse({'name': 'GIT_SSH_KNOWN_HOSTS_FILE',
-                          'value': '/etc/git-secret/known_hosts'} in env)
-        self.assertFalse({'name': 'GIT_SYNC_SSH', 'value': 'true'} in env)
-
-    def test_init_environment_using_git_sync_user_with_known_hosts(self):
-        # Tests the init environment created with git-sync User authentication option is correct
-        # with known hosts file
-        self.kube_config.airflow_configmap = 'airflow-configmap'
-        self.kube_config.git_user = 'git_user'
-        self.kube_config.git_password = 'git_password'
-        self.kube_config.git_ssh_known_hosts_configmap_name = 'airflow-configmap'
-        self.kube_config.git_ssh_key_secret_name = None
-        self.kube_config.dags_volume_claim = None
-        self.kube_config.dags_volume_host = None
-        self.kube_config.dags_in_image = None
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        init_containers = worker_config._get_init_containers()
-
-        self.assertTrue(init_containers)  # check not empty
-        env = init_containers[0]['env']
-
-        self.assertFalse({'name': 'GIT_SSH_KEY_FILE', 'value': '/etc/git-secret/ssh'} in env)
-        self.assertTrue({'name': 'GIT_SYNC_USERNAME', 'value': 'git_user'} in env)
-        self.assertTrue({'name': 'GIT_SYNC_PASSWORD', 'value': 'git_password'} in env)
-        self.assertTrue({'name': 'GIT_KNOWN_HOSTS', 'value': 'true'} in env)
-        self.assertTrue({'name': 'GIT_SSH_KNOWN_HOSTS_FILE',
-                         'value': '/etc/git-secret/known_hosts'} in env)
-        self.assertFalse({'name': 'GIT_SYNC_SSH', 'value': 'true'} in env)
-
-    def test_make_pod_git_sync_credentials_secret(self):
-        # Tests the pod created with git_sync_credentials_secret will get into the init container
-        self.kube_config.git_sync_credentials_secret = 'airflow-git-creds-secret'
-        self.kube_config.dags_volume_claim = None
-        self.kube_config.dags_volume_host = None
-        self.kube_config.dags_in_image = None
-        self.kube_config.worker_fs_group = None
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        kube_executor_config = KubernetesExecutorConfig(annotations=[],
-                                                        volumes=[],
-                                                        volume_mounts=[])
-
-        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
-                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
-                                     kube_executor_config)
-
-        username_env = {
-            'name': 'GIT_SYNC_USERNAME',
-            'valueFrom': {
-                'secretKeyRef': {
-                    'name': self.kube_config.git_sync_credentials_secret,
-                    'key': 'GIT_SYNC_USERNAME'
-                }
-            }
-        }
-        password_env = {
-            'name': 'GIT_SYNC_PASSWORD',
-            'valueFrom': {
-                'secretKeyRef': {
-                    'name': self.kube_config.git_sync_credentials_secret,
-                    'key': 'GIT_SYNC_PASSWORD'
-                }
-            }
-        }
-
-        self.assertIn(username_env, pod.init_containers[0]["env"],
-                      'The username env for git credentials did not get into the init container')
-
-        self.assertIn(password_env, pod.init_containers[0]["env"],
-                      'The password env for git credentials did not get into the init container')
-
-    def test_make_pod_git_sync_rev(self):
-        # Tests the pod created with git_sync_credentials_secret will get into the init container
-        self.kube_config.git_sync_rev = 'sampletag'
-        self.kube_config.dags_volume_claim = None
-        self.kube_config.dags_volume_host = None
-        self.kube_config.dags_in_image = None
-        self.kube_config.worker_fs_group = None
-        self.kube_config.git_dags_folder_mount_point = 'dags'
-        self.kube_config.git_sync_dest = 'repo'
-        self.kube_config.git_subpath = 'path'
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        kube_executor_config = KubernetesExecutorConfig(annotations=[],
-                                                        volumes=[],
-                                                        volume_mounts=[])
-
-        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
-                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
-                                     kube_executor_config)
-
-        rev_env = {
-            'name': 'GIT_SYNC_REV',
-            'value': self.kube_config.git_sync_rev
-        }
-
-        self.assertIn(rev_env, pod.init_containers[0]["env"],
-                      'The git_sync_rev env did not get into the init container')
-
-    def test_init_environment_using_git_sync_run_as_user_empty(self):
-        # Tests if git_syn_run_as_user is none, then no securityContext created in init container
-
-        self.kube_config.dags_volume_claim = None
-        self.kube_config.dags_volume_host = None
-        self.kube_config.dags_in_image = None
-        self.kube_config.git_sync_run_as_user = ''
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        init_containers = worker_config._get_init_containers()
-        self.assertTrue(init_containers)  # check not empty
-
-        self.assertNotIn(
-            'securityContext', init_containers[0],
-            "securityContext shouldn't be defined"
-        )
-
-    def test_make_pod_run_as_user_0(self):
-        # Tests the pod created with run-as-user 0 actually gets that in it's config
-        self.kube_config.worker_run_as_user = 0
-        self.kube_config.dags_volume_claim = None
-        self.kube_config.dags_volume_host = None
-        self.kube_config.dags_in_image = None
-        self.kube_config.worker_fs_group = None
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        kube_executor_config = KubernetesExecutorConfig(annotations=[],
-                                                        volumes=[],
-                                                        volume_mounts=[])
-
-        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
-                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
-                                     kube_executor_config)
-
-        self.assertEqual(0, pod.security_context['runAsUser'])
-
-    def test_make_pod_assert_labels(self):
-        # Tests the pod created has all the expected labels set
-        self.kube_config.dags_folder = 'dags'
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        kube_executor_config = KubernetesExecutorConfig(annotations=[],
-                                                        volumes=[],
-
-                                                        volume_mounts=[])
-        pod = worker_config.make_pod("default", "sample-uuid", "test_pod_id", "test_dag_id",
-                                     "test_task_id", "2019-11-21 11:08:22.920875", 1, "bash -c 'ls /'",
-                                     kube_executor_config)
-        expected_labels = {
-            'airflow-worker': 'sample-uuid',
-            'airflow_version': airflow_version.replace('+', '-'),
-            'dag_id': 'test_dag_id',
-            'execution_date': '2019-11-21 11:08:22.920875',
-            'kubernetes_executor': 'True',
-            'my_label': 'label_id',
-            'task_id': 'test_task_id',
-            'try_number': '1'
-        }
-        self.assertEqual(pod.labels, expected_labels)
-
-    def test_make_pod_git_sync_ssh_without_known_hosts(self):
-        # Tests the pod created with git-sync SSH authentication option is correct without known hosts
-        self.kube_config.airflow_configmap = 'airflow-configmap'
-        self.kube_config.git_ssh_key_secret_name = 'airflow-secrets'
-        self.kube_config.dags_volume_claim = None
-        self.kube_config.dags_volume_host = None
-        self.kube_config.dags_in_image = None
-        self.kube_config.worker_fs_group = None
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        kube_executor_config = KubernetesExecutorConfig(annotations=[],
-                                                        volumes=[],
-                                                        volume_mounts=[])
-
-        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
-                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
-                                     kube_executor_config)
-
-        init_containers = worker_config._get_init_containers()
-        git_ssh_key_file = next((x['value'] for x in init_containers[0]['env']
-                                 if x['name'] == 'GIT_SSH_KEY_FILE'), None)
-        volume_mount_ssh_key = next((x['mountPath'] for x in init_containers[0]['volumeMounts']
-                                     if x['name'] == worker_config.git_sync_ssh_secret_volume_name),
-                                    None)
-        self.assertTrue(git_ssh_key_file)
-        self.assertTrue(volume_mount_ssh_key)
-        self.assertEqual(65533, pod.security_context['fsGroup'])
-        self.assertEqual(git_ssh_key_file,
-                         volume_mount_ssh_key,
-                         'The location where the git ssh secret is mounted'
-                         ' needs to be the same as the GIT_SSH_KEY_FILE path')
-
-    def test_make_pod_git_sync_ssh_with_known_hosts(self):
-        # Tests the pod created with git-sync SSH authentication option is correct with known hosts
-        self.kube_config.airflow_configmap = 'airflow-configmap'
-        self.kube_config.git_ssh_secret_name = 'airflow-secrets'
-        self.kube_config.dags_volume_claim = None
-        self.kube_config.dags_volume_host = None
-        self.kube_config.dags_in_image = None
-
-        worker_config = WorkerConfiguration(self.kube_config)
-
-        init_containers = worker_config._get_init_containers()
-        git_ssh_known_hosts_file = next((x['value'] for x in init_containers[0]['env']
-                                         if x['name'] == 'GIT_SSH_KNOWN_HOSTS_FILE'), None)
-
-        volume_mount_ssh_known_hosts_file = next(
-            (x['mountPath'] for x in init_containers[0]['volumeMounts']
-             if x['name'] == worker_config.git_sync_ssh_known_hosts_volume_name),
-            None)
-        self.assertTrue(git_ssh_known_hosts_file)
-        self.assertTrue(volume_mount_ssh_known_hosts_file)
-        self.assertEqual(git_ssh_known_hosts_file,
-                         volume_mount_ssh_known_hosts_file,
-                         'The location where the git known hosts file is mounted'
-                         ' needs to be the same as the GIT_SSH_KNOWN_HOSTS_FILE path')
-
-    def test_make_pod_with_empty_executor_config(self):
-        self.kube_config.kube_affinity = self.affinity_config
-        self.kube_config.kube_tolerations = self.tolerations_config
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        kube_executor_config = KubernetesExecutorConfig(annotations=[],
-                                                        volumes=[],
-                                                        volume_mounts=[]
-                                                        )
-
-        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
-                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
-                                     kube_executor_config)
-
-        self.assertTrue(pod.affinity['podAntiAffinity'] is not None)
-        self.assertEqual('app',
-                         pod.affinity['podAntiAffinity']
-                         ['requiredDuringSchedulingIgnoredDuringExecution'][0]
-                         ['labelSelector']
-                         ['matchExpressions'][0]
-                         ['key'])
-
-        self.assertEqual(2, len(pod.tolerations))
-        self.assertEqual('prod', pod.tolerations[1]['key'])
-
-    def test_make_pod_with_executor_config(self):
-        worker_config = WorkerConfiguration(self.kube_config)
-        kube_executor_config = KubernetesExecutorConfig(affinity=self.affinity_config,
-                                                        tolerations=self.tolerations_config,
-                                                        annotations=[],
-                                                        volumes=[],
-                                                        volume_mounts=[]
-                                                        )
-
-        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
-                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
-                                     kube_executor_config)
-
-        self.assertTrue(pod.affinity['podAntiAffinity'] is not None)
-        self.assertEqual('app',
-                         pod.affinity['podAntiAffinity']
-                         ['requiredDuringSchedulingIgnoredDuringExecution'][0]
-                         ['labelSelector']
-                         ['matchExpressions'][0]
-                         ['key'])
-
-        self.assertEqual(2, len(pod.tolerations))
-        self.assertEqual('prod', pod.tolerations[1]['key'])
-
-    def test_worker_pvc_dags(self):
-        # Tests persistence volume config created when `dags_volume_claim` is set
-        self.kube_config.dags_volume_claim = 'airflow-dags'
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        volumes, volume_mounts = worker_config._get_volumes_and_mounts()
-
-        init_containers = worker_config._get_init_containers()
-
-        dag_volume = [volume for volume in volumes.values() if volume['name'] == 'airflow-dags']
-        dag_volume_mount = [mount for mount in volume_mounts.values() if mount['name'] == 'airflow-dags']
-
-        self.assertEqual('airflow-dags', dag_volume[0]['persistentVolumeClaim']['claimName'])
-        self.assertEqual(1, len(dag_volume_mount))
-        self.assertTrue(dag_volume_mount[0]['readOnly'])
-        self.assertEqual(0, len(init_containers))
-
-    def test_worker_git_dags(self):
-        # Tests persistence volume config created when `git_repo` is set
-        self.kube_config.dags_volume_claim = None
-        self.kube_config.dags_volume_host = None
-        self.kube_config.dags_folder = '/usr/local/airflow/dags'
-        self.kube_config.worker_dags_folder = '/usr/local/airflow/dags'
-
-        self.kube_config.git_sync_container_repository = 'gcr.io/google-containers/git-sync-amd64'
-        self.kube_config.git_sync_container_tag = 'v2.0.5'
-        self.kube_config.git_sync_container = 'gcr.io/google-containers/git-sync-amd64:v2.0.5'
-        self.kube_config.git_sync_init_container_name = 'git-sync-clone'
-        self.kube_config.git_subpath = 'dags_folder'
-        self.kube_config.git_sync_root = '/git'
-        self.kube_config.git_sync_run_as_user = 65533
-        self.kube_config.git_dags_folder_mount_point = '/usr/local/airflow/dags/repo/dags_folder'
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        volumes, volume_mounts = worker_config._get_volumes_and_mounts()
-
-        dag_volume = [volume for volume in volumes.values() if volume['name'] == 'airflow-dags']
-        dag_volume_mount = [mount for mount in volume_mounts.values() if mount['name'] == 'airflow-dags']
-
-        self.assertTrue('emptyDir' in dag_volume[0])
-        self.assertEqual(self.kube_config.git_dags_folder_mount_point, dag_volume_mount[0]['mountPath'])
-        self.assertTrue(dag_volume_mount[0]['readOnly'])
-
-        init_container = worker_config._get_init_containers()[0]
-        init_container_volume_mount = [mount for mount in init_container['volumeMounts']
-                                       if mount['name'] == 'airflow-dags']
-
-        self.assertEqual('git-sync-clone', init_container['name'])
-        self.assertEqual('gcr.io/google-containers/git-sync-amd64:v2.0.5', init_container['image'])
-        self.assertEqual(1, len(init_container_volume_mount))
-        self.assertFalse(init_container_volume_mount[0]['readOnly'])
-        self.assertEqual(65533, init_container['securityContext']['runAsUser'])
-
-    def test_worker_container_dags(self):
-        # Tests that the 'airflow-dags' persistence volume is NOT created when `dags_in_image` is set
-        self.kube_config.dags_in_image = True
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        volumes, volume_mounts = worker_config._get_volumes_and_mounts()
-
-        dag_volume = [volume for volume in volumes.values() if volume['name'] == 'airflow-dags']
-        dag_volume_mount = [mount for mount in volume_mounts.values() if mount['name'] == 'airflow-dags']
-
-        init_containers = worker_config._get_init_containers()
-
-        self.assertEqual(0, len(dag_volume))
-        self.assertEqual(0, len(dag_volume_mount))
-        self.assertEqual(0, len(init_containers))
-
-    def test_set_airflow_config_configmap(self):
-        """
-        Test that airflow.cfg can be set via configmap by
-        checking volume & volume-mounts are set correctly.
-        """
-        self.kube_config.airflow_home = '/usr/local/airflow'
-        self.kube_config.airflow_configmap = 'airflow-configmap'
-        self.kube_config.airflow_local_settings_configmap = None
-        self.kube_config.dags_folder = '/workers/path/to/dags'
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        kube_executor_config = KubernetesExecutorConfig(annotations=[],
-                                                        volumes=[],
-                                                        volume_mounts=[])
-
-        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
-                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
-                                     kube_executor_config)
-
-        airflow_config_volume = [
-            volume for volume in pod.volumes if volume["name"] == 'airflow-config'
-        ]
-        # Test that volume_name is found
-        self.assertEqual(1, len(airflow_config_volume))
-
-        # Test that config map exists
-        self.assertEqual(
-            {'configMap': {'name': 'airflow-configmap'}, 'name': 'airflow-config'},
-            airflow_config_volume[0]
-        )
-
-        # Test that only 1 Volume Mounts exists with 'airflow-config' name
-        # One for airflow.cfg
-        volume_mounts = [
-            volume_mount for volume_mount in pod.volume_mounts
-            if volume_mount['name'] == 'airflow-config'
-        ]
-
-        self.assertEqual([
-            {
-                'mountPath': '/usr/local/airflow/airflow.cfg',
-                'name': 'airflow-config',
-                'readOnly': True,
-                'subPath': 'airflow.cfg',
-            }
-        ],
-            volume_mounts
-        )
-
-    def test_set_airflow_local_settings_configmap(self):
-        """
-        Test that airflow_local_settings.py can be set via configmap by
-        checking volume & volume-mounts are set correctly.
-        """
-        self.kube_config.airflow_home = '/usr/local/airflow'
-        self.kube_config.airflow_configmap = 'airflow-configmap'
-        self.kube_config.airflow_local_settings_configmap = 'airflow-configmap'
-        self.kube_config.dags_folder = '/workers/path/to/dags'
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        kube_executor_config = KubernetesExecutorConfig(annotations=[],
-                                                        volumes=[],
-                                                        volume_mounts=[])
-
-        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
-                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
-                                     kube_executor_config)
-
-        airflow_config_volume = [
-            volume for volume in pod.volumes if volume["name"] == 'airflow-config'
-        ]
-        # Test that volume_name is found
-        self.assertEqual(1, len(airflow_config_volume))
-
-        # Test that config map exists
-        self.assertEqual(
-            {'configMap': {'name': 'airflow-configmap'}, 'name': 'airflow-config'},
-            airflow_config_volume[0]
-        )
-
-        # Test that 2 Volume Mounts exists and has 2 different mount-paths
-        # One for airflow.cfg
-        # Second for airflow_local_settings.py
-        volume_mounts = [
-            volume_mount for volume_mount in pod.volume_mounts
-            if volume_mount['name'] == 'airflow-config'
-        ]
-        self.assertEqual(2, len(volume_mounts))
-
-        six.assertCountEqual(
-            self,
-            [
-                {
-                    'mountPath': '/usr/local/airflow/airflow.cfg',
-                    'name': 'airflow-config',
-                    'readOnly': True,
-                    'subPath': 'airflow.cfg',
-                },
-                {
-                    'mountPath': '/usr/local/airflow/config/airflow_local_settings.py',
-                    'name': 'airflow-config',
-                    'readOnly': True,
-                    'subPath': 'airflow_local_settings.py',
-                }
-            ],
-            volume_mounts
-        )
-
-    def test_set_airflow_configmap_different_for_local_setting(self):
-        """
-        Test that airflow_local_settings.py can be set via configmap by
-        checking volume & volume-mounts are set correctly.
-        """
-        self.kube_config.airflow_home = '/usr/local/airflow'
-        self.kube_config.airflow_configmap = 'airflow-configmap'
-        self.kube_config.airflow_local_settings_configmap = 'airflow-ls-configmap'
-        self.kube_config.dags_folder = '/workers/path/to/dags'
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        kube_executor_config = KubernetesExecutorConfig(annotations=[],
-                                                        volumes=[],
-                                                        volume_mounts=[])
-
-        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
-                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
-                                     kube_executor_config)
-
-        airflow_local_settings_volume = [
-            volume for volume in pod.volumes if volume["name"] == 'airflow-local-settings'
-        ]
-        # Test that volume_name is found
-        self.assertEqual(1, len(airflow_local_settings_volume))
-
-        # Test that config map exists
-        self.assertEqual(
-            [{'configMap': {'name': 'airflow-ls-configmap'}, 'name': 'airflow-local-settings'}],
-            airflow_local_settings_volume
-        )
-
-        # Test that 2 Volume Mounts exists and has 2 different mount-paths
-        # One for airflow.cfg
-        # Second for airflow_local_settings.py
-        airflow_cfg_volume_mount = [
-            volume_mount for volume_mount in pod.volume_mounts
-            if volume_mount['name'] == 'airflow-config'
-        ]
-
-        local_setting_volume_mount = [
-            volume_mount for volume_mount in pod.volume_mounts
-            if volume_mount['name'] == 'airflow-local-settings'
-        ]
-        self.assertEqual(1, len(airflow_cfg_volume_mount))
-        self.assertEqual(1, len(local_setting_volume_mount))
-
-        self.assertEqual(
-            [
-                {
-                    'mountPath': '/usr/local/airflow/config/airflow_local_settings.py',
-                    'name': 'airflow-local-settings',
-                    'readOnly': True,
-                    'subPath': 'airflow_local_settings.py',
-                }
-            ],
-            local_setting_volume_mount
-        )
-
-        print(airflow_cfg_volume_mount)
-
-        self.assertEqual(
-            [
-                {
-                    'mountPath': '/usr/local/airflow/airflow.cfg',
-                    'name': 'airflow-config',
-                    'readOnly': True,
-                    'subPath': 'airflow.cfg',
-                }
-            ],
-            airflow_cfg_volume_mount
-        )
-
-    def test_kubernetes_environment_variables(self):
-        # Tests the kubernetes environment variables get copied into the worker pods
-        input_environment = {
-            'ENVIRONMENT': 'prod',
-            'LOG_LEVEL': 'warning'
-        }
-        self.kube_config.kube_env_vars = input_environment
-        worker_config = WorkerConfiguration(self.kube_config)
-        env = worker_config._get_environment()
-        for key in input_environment:
-            self.assertIn(key, env)
-            self.assertIn(input_environment[key], env.values())
-
-        core_executor = 'AIRFLOW__CORE__EXECUTOR'
-        input_environment = {
-            core_executor: 'NotLocalExecutor'
-        }
-        self.kube_config.kube_env_vars = input_environment
-        worker_config = WorkerConfiguration(self.kube_config)
-        env = worker_config._get_environment()
-        self.assertEqual(env[core_executor], 'LocalExecutor')
-
-    def test_kubernetes_environment_variables_for_init_container(self):
-        self.kube_config.dags_volume_claim = None
-        self.kube_config.dags_volume_host = None
-        self.kube_config.dags_in_image = None
-
-        # Tests the kubernetes environment variables get copied into the worker pods
-        input_environment = {
-            'ENVIRONMENT': 'prod',
-            'LOG_LEVEL': 'warning'
-        }
-        self.kube_config.kube_env_vars = input_environment
-        worker_config = WorkerConfiguration(self.kube_config)
-        env = worker_config._get_environment()
-        for key in input_environment:
-            self.assertIn(key, env)
-            self.assertIn(input_environment[key], env.values())
-
-        init_containers = worker_config._get_init_containers()
-
-        self.assertTrue(init_containers)  # check not empty
-        env = init_containers[0]['env']
-
-        self.assertTrue({'name': 'ENVIRONMENT', 'value': 'prod'} in env)
-        self.assertTrue({'name': 'LOG_LEVEL', 'value': 'warning'} in env)
-
-    def test_get_secrets(self):
-        # Test when secretRef is None and kube_secrets is not empty
-        self.kube_config.kube_secrets = {
-            'AWS_SECRET_KEY': 'airflow-secret=aws_secret_key',
-            'POSTGRES_PASSWORD': 'airflow-secret=postgres_credentials'
-        }
-        self.kube_config.env_from_secret_ref = None
-        worker_config = WorkerConfiguration(self.kube_config)
-        secrets = worker_config._get_secrets()
-        secrets.sort(key=lambda secret: secret.deploy_target)
-        expected = [
-            Secret('env', 'AWS_SECRET_KEY', 'airflow-secret', 'aws_secret_key'),
-            Secret('env', 'POSTGRES_PASSWORD', 'airflow-secret', 'postgres_credentials')
-        ]
-        self.assertListEqual(expected, secrets)
-
-        # Test when secret is not empty and kube_secrets is empty dict
-        self.kube_config.kube_secrets = {}
-        self.kube_config.env_from_secret_ref = 'secret_a,secret_b'
-        worker_config = WorkerConfiguration(self.kube_config)
-        secrets = worker_config._get_secrets()
-        expected = [
-            Secret('env', None, 'secret_a'),
-            Secret('env', None, 'secret_b')
-        ]
-        self.assertListEqual(expected, secrets)
-
-    def test_get_configmaps(self):
-        # Test when configmap is empty
-        self.kube_config.env_from_configmap_ref = ''
-        worker_config = WorkerConfiguration(self.kube_config)
-        configmaps = worker_config._get_configmaps()
-        self.assertListEqual([], configmaps)
-
-        # test when configmap is not empty
-        self.kube_config.env_from_configmap_ref = 'configmap_a,configmap_b'
-        worker_config = WorkerConfiguration(self.kube_config)
-        configmaps = worker_config._get_configmaps()
-        self.assertListEqual(['configmap_a', 'configmap_b'], configmaps)
-
-    def test_get_labels(self):
-        worker_config = WorkerConfiguration(self.kube_config)
-        labels = worker_config._get_labels({'my_kube_executor_label': 'kubernetes'}, {
-            'dag_id': 'override_dag_id',
-        })
-        self.assertEqual({
-            'my_label': 'label_id',
-            'dag_id': 'override_dag_id',
-            'my_kube_executor_label': 'kubernetes',
-        }, labels)
-
-
 class TestKubernetesExecutor(unittest.TestCase):
     """
     Tests if an ApiException from the Kube Client will cause the task to
@@ -1001,6 +145,9 @@ class TestKubernetesExecutor(unittest.TestCase):
         mock_kube_client.create_namespaced_pod = mock.MagicMock(
             side_effect=ApiException(http_resp=r))
         mock_get_kube_client.return_value = mock_kube_client
+        mock_api_client = mock.MagicMock()
+        mock_api_client.sanitize_for_serialization.return_value = {}
+        mock_kube_client.api_client = mock_api_client
 
         kubernetesExecutor = KubernetesExecutor()
         kubernetesExecutor.start()
@@ -1045,10 +192,12 @@ class TestKubernetesExecutor(unittest.TestCase):
         executor._change_state(key, State.RUNNING, 'pod_id', 'default')
         self.assertTrue(executor.event_buffer[key] == State.RUNNING)
 
+    @mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
     @mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
-    def test_change_state_success(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher):
+    def test_change_state_success(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher,
+                                  mock_kube_config):
         executor = KubernetesExecutor()
         executor.start()
         test_time = timezone.utcnow()
@@ -1057,11 +206,12 @@ class TestKubernetesExecutor(unittest.TestCase):
         self.assertTrue(executor.event_buffer[key] == State.SUCCESS)
         mock_delete_pod.assert_called_once_with('pod_id', 'default')
 
+    @mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
     @mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
-    def test_change_state_failed_no_deletion(self, mock_delete_pod, mock_get_kube_client,
-                                             mock_kubernetes_job_watcher):
+    def test_change_state_failed(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher,
+                                 mock_kube_config):
         executor = KubernetesExecutor()
         executor.kube_config.delete_worker_pods = False
         executor.kube_config.delete_worker_pods_on_failure = False
@@ -1072,11 +222,12 @@ class TestKubernetesExecutor(unittest.TestCase):
         self.assertTrue(executor.event_buffer[key] == State.FAILED)
         mock_delete_pod.assert_not_called()
 
+    @mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
     @mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
     def test_change_state_skip_pod_deletion(self, mock_delete_pod, mock_get_kube_client,
-                                            mock_kubernetes_job_watcher):
+                                            mock_kubernetes_job_watcher, mock_kube_config):
         test_time = timezone.utcnow()
         executor = KubernetesExecutor()
         executor.kube_config.delete_worker_pods = False
@@ -1087,11 +238,12 @@ class TestKubernetesExecutor(unittest.TestCase):
         self.assertTrue(executor.event_buffer[key] == State.SUCCESS)
         mock_delete_pod.assert_not_called()
 
+    @mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
     @mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
     def test_change_state_failed_pod_deletion(self, mock_delete_pod, mock_get_kube_client,
-                                              mock_kubernetes_job_watcher):
+                                              mock_kubernetes_job_watcher, mock_kube_config):
         executor = KubernetesExecutor()
         executor.kube_config.delete_worker_pods_on_failure = True
 
diff --git a/tests/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py b/tests/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py
deleted file mode 100644
index 87d0073..0000000
--- a/tests/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py
+++ /dev/null
@@ -1,396 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from airflow.kubernetes.kubernetes_request_factory.kubernetes_request_factory import KubernetesRequestFactory
-from airflow.kubernetes.pod import Pod, Resources
-from airflow.kubernetes.pod_runtime_info_env import PodRuntimeInfoEnv
-from airflow.kubernetes.secret import Secret
-from parameterized import parameterized
-import unittest
-import copy
-
-
-class TestKubernetesRequestFactory(unittest.TestCase):
-
-    def setUp(self):
-
-        self.expected = {
-            'apiVersion': 'v1',
-            'kind': 'Pod',
-            'metadata': {
-                'name': 'name'
-            },
-            'spec': {
-                'containers': [{
-                    'name': 'base',
-                    'image': 'airflow-worker:latest',
-                    'command': [
-                        "/usr/local/airflow/entrypoint.sh",
-                        "/bin/bash sleep 25"
-                    ],
-                }],
-                'restartPolicy': 'Never'
-            }
-        }
-        self.input_req = copy.deepcopy(self.expected)
-
-    def test_extract_image(self):
-        image = 'v3.14'
-        pod = Pod(image, {}, [])
-        KubernetesRequestFactory.extract_image(pod, self.input_req)
-        self.expected['spec']['containers'][0]['image'] = image
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_extract_image_pull_policy(self):
-        # Test when pull policy is not none
-        pull_policy = 'IfNotPresent'
-        pod = Pod('v3.14', {}, [], image_pull_policy=pull_policy)
-
-        KubernetesRequestFactory.extract_image_pull_policy(pod, self.input_req)
-        self.expected['spec']['containers'][0]['imagePullPolicy'] = pull_policy
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_add_secret_to_env(self):
-        secret = Secret('env', 'target', 'my-secret', 'KEY')
-        secret_list = []
-        self.expected = [{
-            'name': 'TARGET',
-            'valueFrom': {
-                'secretKeyRef': {
-                    'name': 'my-secret',
-                    'key': 'KEY'
-                }
-            }
-        }]
-        KubernetesRequestFactory.add_secret_to_env(secret_list, secret)
-        self.assertListEqual(secret_list, self.expected)
-
-    def test_extract_labels(self):
-        # Test when labels are not empty
-        labels = {'label_a': 'val_a', 'label_b': 'val_b'}
-        pod = Pod('v3.14', {}, [], labels=labels)
-        self.expected['metadata']['labels'] = labels
-        KubernetesRequestFactory.extract_labels(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_extract_annotations(self):
-        # Test when annotations are not empty
-        annotations = {'annot_a': 'val_a', 'annot_b': 'val_b'}
-        pod = Pod('v3.14', {}, [], annotations=annotations)
-        self.expected['metadata']['annotations'] = annotations
-        KubernetesRequestFactory.extract_annotations(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_extract_affinity(self):
-        # Test when affinity is not empty
-        affinity = {'podAffinity': 'requiredDuringSchedulingIgnoredDuringExecution'}
-        pod = Pod('v3.14', {}, [], affinity=affinity)
-        self.expected['spec']['affinity'] = affinity
-        KubernetesRequestFactory.extract_affinity(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_extract_node_selector(self):
-        # Test when affinity is not empty
-        node_selectors = {'disktype': 'ssd', 'accelerator': 'nvidia-tesla-p100'}
-        pod = Pod('v3.14', {}, [], node_selectors=node_selectors)
-        self.expected['spec']['nodeSelector'] = node_selectors
-        KubernetesRequestFactory.extract_node_selector(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_extract_cmds(self):
-        cmds = ['test-cmd.sh']
-        pod = Pod('v3.14', {}, cmds)
-        KubernetesRequestFactory.extract_cmds(pod, self.input_req)
-        self.expected['spec']['containers'][0]['command'] = cmds
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_extract_args(self):
-        args = ['test_arg.sh']
-        pod = Pod('v3.14', {}, [], args=args)
-        KubernetesRequestFactory.extract_args(pod, self.input_req)
-        self.expected['spec']['containers'][0]['args'] = args
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_attach_volumes(self):
-        # Test when volumes is not empty list
-        volumes = ['vol_a', 'vol_b']
-        pod = Pod('v3.14', {}, [], volumes=volumes)
-        self.expected['spec']['volumes'] = volumes
-        KubernetesRequestFactory.attach_volumes(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_attach_volume_mounts(self):
-        # Test when volumes is not empty list
-        volume_mounts = ['vol_a', 'vol_b']
-        pod = Pod('v3.14', {}, [], volume_mounts=volume_mounts)
-        self.expected['spec']['containers'][0]['volumeMounts'] = volume_mounts
-        KubernetesRequestFactory.attach_volume_mounts(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_extract_name(self):
-        name = 'pod-name'
-        pod = Pod('v3.14', {}, [], name=name)
-        self.expected['metadata']['name'] = name
-        KubernetesRequestFactory.extract_name(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_extract_volume_secrets(self):
-        # Test when secrets is not empty
-        secrets = [
-            Secret('volume', 'KEY1', 's1', 'key-1'),
-            Secret('env', 'KEY2', 's2'),
-            Secret('volume', 'KEY3', 's3', 'key-2')
-        ]
-        pod = Pod('v3.14', {}, [], secrets=secrets)
-        self.expected['spec']['containers'][0]['volumeMounts'] = [{
-            'mountPath': 'KEY1',
-            'name': 'secretvol0',
-            'readOnly': True
-        }, {
-            'mountPath': 'KEY3',
-            'name': 'secretvol1',
-            'readOnly': True
-        }]
-        self.expected['spec']['volumes'] = [{
-            'name': 'secretvol0',
-            'secret': {
-                'secretName': 's1'
-            }
-        }, {
-            'name': 'secretvol1',
-            'secret': {
-                'secretName': 's3'
-            }
-        }]
-        KubernetesRequestFactory.extract_volume_secrets(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_extract_env_and_secrets(self):
-        # Test when secrets and envs are not empty
-        secrets = [
-            Secret('env', None, 's1'),
-            Secret('volume', 'KEY2', 's2', 'key-2'),
-            Secret('env', None, 's3')
-        ]
-        envs = {
-            'ENV1': 'val1',
-            'ENV2': 'val2'
-        }
-        configmaps = ['configmap_a', 'configmap_b']
-        pod_runtime_envs = [PodRuntimeInfoEnv("ENV3", "status.podIP")]
-        pod = Pod(
-            image='v3.14',
-            envs=envs,
-            cmds=[],
-            secrets=secrets,
-            configmaps=configmaps,
-            pod_runtime_info_envs=pod_runtime_envs)
-        self.expected['spec']['containers'][0]['env'] = [
-            {'name': 'ENV1', 'value': 'val1'},
-            {'name': 'ENV2', 'value': 'val2'},
-            {
-                'name': 'ENV3',
-                'valueFrom': {
-                    'fieldRef': {
-                        'fieldPath': 'status.podIP'
-                    }
-                }
-            }
-        ]
-        self.expected['spec']['containers'][0]['envFrom'] = [{
-            'secretRef': {
-                'name': 's1'
-            }
-        }, {
-            'secretRef': {
-                'name': 's3'
-            }
-        }, {
-            'configMapRef': {
-                'name': 'configmap_a'
-            }
-        }, {
-            'configMapRef': {
-                'name': 'configmap_b'
-            }
-        }]
-
-        KubernetesRequestFactory.extract_env_and_secrets(pod, self.input_req)
-        self.input_req['spec']['containers'][0]['env'].sort(key=lambda x: x['name'])
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_extract_env_and_secret_order(self):
-        envs = {
-            'ENV': 'val1',
-        }
-        pod_runtime_envs = [PodRuntimeInfoEnv('RUNTIME_ENV', 'status.podIP')]
-        pod = Pod(
-            image='v3.14',
-            envs=envs,
-            cmds=[],
-            pod_runtime_info_envs=pod_runtime_envs)
-        self.expected['spec']['containers'][0]['env'] = [
-            {
-                'name': 'RUNTIME_ENV',
-                'valueFrom': {
-                    'fieldRef': {
-                        'fieldPath': 'status.podIP'
-                    }
-                }
-            },
-            {'name': 'ENV', 'value': 'val1'}
-        ]
-        KubernetesRequestFactory.extract_env_and_secrets(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_extract_resources(self):
-        # Test when resources is not empty
-        resources = Resources(
-            request_memory='1Gi',
-            request_cpu=1,
-            limit_memory='2Gi',
-            limit_cpu=2)
-
-        pod = Pod('v3.14', {}, [], resources=resources)
-        self.expected['spec']['containers'][0]['resources'] = {
-            'requests': {
-                'memory': '1Gi',
-                'cpu': 1
-            },
-            'limits': {
-                'memory': '2Gi',
-                'cpu': 2
-            },
-        }
-        KubernetesRequestFactory.extract_resources(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_display_resources(self):
-        resources_string = str(Resources('1Gi', 1))
-        self.assertEqual(
-            resources_string,
-            "Request: [cpu: 1, memory: 1Gi], Limit: [cpu: None, memory: None, gpu: None]")
-
-    def test_extract_limits(self):
-        # Test when resources is not empty
-        resources = Resources(
-            limit_memory='1Gi',
-            limit_cpu=1)
-
-        pod = Pod('v3.14', {}, [], resources=resources)
-        self.expected['spec']['containers'][0]['resources'] = {
-            'limits': {
-                'memory': '1Gi',
-                'cpu': 1
-            }
-        }
-        KubernetesRequestFactory.extract_resources(pod, self.input_req)
-        self.assertEqual(self.expected, self.input_req)
-
-    def test_extract_all_resources(self):
-        # Test when resources is not empty
-        resources = Resources(
-            request_memory='1Gi',
-            request_cpu=1,
-            limit_memory='2Gi',
-            limit_cpu=2,
-            limit_gpu=3)
-
-        pod = Pod('v3.14', {}, [], resources=resources)
-        self.expected['spec']['containers'][0]['resources'] = {
-            'requests': {
-                'memory': '1Gi',
-                'cpu': 1
-            },
-            'limits': {
-                'memory': '2Gi',
-                'cpu': 2,
-                'nvidia.com/gpu': 3
-            }
-        }
-        KubernetesRequestFactory.extract_resources(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_extract_init_containers(self):
-        init_container = 'init_container'
-        pod = Pod('v3.14', {}, [], init_containers=init_container)
-        self.expected['spec']['initContainers'] = init_container
-        KubernetesRequestFactory.extract_init_containers(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_extract_service_account_name(self):
-        service_account_name = 'service_account_name'
-        pod = Pod('v3.14', {}, [], service_account_name=service_account_name)
-        self.expected['spec']['serviceAccountName'] = service_account_name
-        KubernetesRequestFactory.extract_service_account_name(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_extract_hostnetwork(self):
-        hostnetwork = True
-        pod = Pod('v3.14', {}, [], hostnetwork=hostnetwork)
-        self.expected['spec']['hostNetwork'] = hostnetwork
-        KubernetesRequestFactory.extract_hostnetwork(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_extract_image_pull_secrets(self):
-        image_pull_secrets = 'secret_a,secret_b,secret_c'
-        pod = Pod('v3.14', {}, [], image_pull_secrets=image_pull_secrets)
-        self.expected['spec']['imagePullSecrets'] = [
-            {'name': 'secret_a'},
-            {'name': 'secret_b'},
-            {'name': 'secret_c'},
-        ]
-        KubernetesRequestFactory.extract_image_pull_secrets(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_extract_tolerations(self):
-        tolerations = [{
-            'key': 'key',
-            'operator': 'Equal',
-            'value': 'value',
-            'effect': 'NoSchedule'
-        }]
-        pod = Pod('v3.14', {}, [], tolerations=tolerations)
-        self.expected['spec']['tolerations'] = tolerations
-        KubernetesRequestFactory.extract_tolerations(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_security_context(self):
-        security_context = {
-            'runAsUser': 1000,
-            'fsGroup': 2000
-        }
-        pod = Pod('v3.14', {}, [], security_context=security_context)
-        self.expected['spec']['securityContext'] = security_context
-        KubernetesRequestFactory.extract_security_context(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    @parameterized.expand([
-        'extract_resources',
-        'extract_init_containers',
-        'extract_service_account_name',
-        'extract_hostnetwork',
-        'extract_image_pull_secrets',
-        'extract_tolerations',
-        'extract_security_context',
-        'extract_volume_secrets'
-    ])
-    def test_identity(self, name):
-        kube_request_factory_func = getattr(KubernetesRequestFactory, name)
-        pod = Pod('v3.14', {}, [])
-        kube_request_factory_func(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
diff --git a/tests/kubernetes/kubernetes_request_factory/test_pod_request_factory.py b/tests/kubernetes/kubernetes_request_factory/test_pod_request_factory.py
deleted file mode 100644
index 68617de..0000000
--- a/tests/kubernetes/kubernetes_request_factory/test_pod_request_factory.py
+++ /dev/null
@@ -1,175 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from airflow.kubernetes.kubernetes_request_factory.pod_request_factory import SimplePodRequestFactory, \
-    ExtractXcomPodRequestFactory
-from airflow.kubernetes.pod import Pod, Resources
-from airflow.kubernetes.secret import Secret
-from airflow.exceptions import AirflowConfigException
-import unittest
-
-XCOM_CMD = 'trap "exit 0" INT; while true; do sleep 30; done;'
-
-
-class TestPodRequestFactory(unittest.TestCase):
-
-    def setUp(self):
-        self.simple_pod_request_factory = SimplePodRequestFactory()
-        self.xcom_pod_request_factory = ExtractXcomPodRequestFactory()
-        self.pod = Pod(
-            image='busybox',
-            envs={
-                'ENVIRONMENT': 'prod',
-                'LOG_LEVEL': 'warning'
-            },
-            name='myapp-pod',
-            cmds=['sh', '-c', 'echo Hello Kubernetes!'],
-            labels={'app': 'myapp'},
-            image_pull_secrets='pull_secret_a,pull_secret_b',
-            configmaps=['configmap_a', 'configmap_b'],
-            ports=[{'name': 'foo', 'containerPort': 1234}],
-            resources=Resources('1Gi', 1, '2Gi', 2, 1),
-            secrets=[
-                # This should be a secretRef
-                Secret('env', None, 'secret_a'),
-                # This should be a single secret mounted in volumeMounts
-                Secret('volume', '/etc/foo', 'secret_b'),
-                # This should produce a single secret mounted in env
-                Secret('env', 'TARGET', 'secret_b', 'source_b'),
-            ],
-            security_context={
-                'runAsUser': 1000,
-                'fsGroup': 2000,
-            }
-        )
-        self.maxDiff = None
-        self.expected = {
-            'apiVersion': 'v1',
-            'kind': 'Pod',
-            'metadata': {
-                'name': 'myapp-pod',
-                'labels': {'app': 'myapp'},
-                'annotations': {}},
-            'spec': {
-                'containers': [{
-                    'name': 'base',
-                    'image': 'busybox',
-                    'command': [
-                        'sh', '-c', 'echo Hello Kubernetes!'
-                    ],
-                    'imagePullPolicy': 'IfNotPresent',
-                    'args': [],
-                    'env': [{
-                        'name': 'ENVIRONMENT',
-                        'value': 'prod'
-                    }, {
-                        'name': 'LOG_LEVEL',
-                        'value': 'warning'
-                    }, {
-                        'name': 'TARGET',
-                        'valueFrom': {
-                            'secretKeyRef': {
-                                'name': 'secret_b',
-                                'key': 'source_b'
-                            }
-                        }
-                    }],
-                    'envFrom': [{
-                        'secretRef': {
-                            'name': 'secret_a'
-                        }
-                    }, {
-                        'configMapRef': {
-                            'name': 'configmap_a'
-                        }
-                    }, {
-                        'configMapRef': {
-                            'name': 'configmap_b'
-                        }
-                    }],
-                    'resources': {
-                        'requests': {
-                            'memory': '1Gi',
-                            'cpu': 1
-                        },
-                        'limits': {
-                            'memory': '2Gi',
-                            'cpu': 2,
-                            'nvidia.com/gpu': 1
-                        },
-                    },
-                    'ports': [{'name': 'foo', 'containerPort': 1234}],
-                    'volumeMounts': [{
-                        'mountPath': '/etc/foo',
-                        'name': 'secretvol0',
-                        'readOnly': True
-                    }]
-                }],
-                'restartPolicy': 'Never',
-                'nodeSelector': {},
-                'volumes': [{
-                    'name': 'secretvol0',
-                    'secret': {
-                        'secretName': 'secret_b'
-                    }
-                }],
-                'imagePullSecrets': [
-                    {'name': 'pull_secret_a'},
-                    {'name': 'pull_secret_b'}
-                ],
-                'affinity': {},
-                'securityContext': {
-                    'runAsUser': 1000,
-                    'fsGroup': 2000,
-                },
-            }
-        }
-
-    def test_secret_throws(self):
-        with self.assertRaises(AirflowConfigException):
-            Secret('volume', None, 'secret_a', 'key')
-
-    def test_simple_pod_request_factory_create(self):
-        result = self.simple_pod_request_factory.create(self.pod)
-        # sort
-        result['spec']['containers'][0]['env'].sort(key=lambda x: x['name'])
-        self.assertEqual(result, self.expected)
-
-    def test_xcom_pod_request_factory_create(self):
-        result = self.xcom_pod_request_factory.create(self.pod)
-        container_two = {
-            'name': 'airflow-xcom-sidecar',
-            'image': 'alpine',
-            'command': ['sh', '-c', XCOM_CMD],
-            'volumeMounts': [
-                {
-                    'name': 'xcom',
-                    'mountPath': '/airflow/xcom'
-                }
-            ],
-            'resources': {'requests': {'cpu': '1m'}},
-        }
-        self.expected['spec']['containers'].append(container_two)
-        self.expected['spec']['containers'][0]['volumeMounts'].insert(0, {
-            'name': 'xcom',
-            'mountPath': '/airflow/xcom'
-        })
-        self.expected['spec']['volumes'].insert(0, {
-            'name': 'xcom', 'emptyDir': {}
-        })
-        result['spec']['containers'][0]['env'].sort(key=lambda x: x['name'])
-        self.assertEqual(result, self.expected)
diff --git a/tests/kubernetes/kubernetes_request_factory/__init__.py b/tests/kubernetes/models/__init__.py
similarity index 100%
rename from tests/kubernetes/kubernetes_request_factory/__init__.py
rename to tests/kubernetes/models/__init__.py
diff --git a/tests/kubernetes/models/test_pod.py b/tests/kubernetes/models/test_pod.py
new file mode 100644
index 0000000..edcd364
--- /dev/null
+++ b/tests/kubernetes/models/test_pod.py
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+from tests.compat import mock
+from kubernetes.client import ApiClient
+import kubernetes.client.models as k8s
+from airflow.kubernetes.pod import Port
+from airflow.kubernetes.pod_generator import PodGenerator
+from airflow.kubernetes.k8s_model import append_to_pod
+
+
+class TestPod(unittest.TestCase):
+
+    def test_port_to_k8s_client_obj(self):
+        port = Port('http', 80)
+        self.assertEqual(
+            port.to_k8s_client_obj(),
+            k8s.V1ContainerPort(
+                name='http',
+                container_port=80
+            )
+        )
+
+    @mock.patch('uuid.uuid4')
+    def test_port_attach_to_pod(self, mock_uuid):
+        mock_uuid.return_value = '0'
+        pod = PodGenerator(image='airflow-worker:latest', name='base').gen_pod()
+        ports = [
+            Port('https', 443),
+            Port('http', 80)
+        ]
+        k8s_client = ApiClient()
+        result = append_to_pod(pod, ports)
+        result = k8s_client.sanitize_for_serialization(result)
+        self.assertEqual({
+            'apiVersion': 'v1',
+            'kind': 'Pod',
+            'metadata': {'name': 'base-0'},
+            'spec': {
+                'containers': [{
+                    'args': [],
+                    'command': [],
+                    'env': [],
+                    'envFrom': [],
+                    'image': 'airflow-worker:latest',
+                    'imagePullPolicy': 'IfNotPresent',
+                    'name': 'base',
+                    'ports': [{
+                        'name': 'https',
+                        'containerPort': 443
+                    }, {
+                        'name': 'http',
+                        'containerPort': 80
+                    }],
+                    'volumeMounts': [],
+                }],
+                'hostNetwork': False,
+                'imagePullSecrets': [],
+                'restartPolicy': 'Never',
+                'volumes': []
+            }
+        }, result)
diff --git a/tests/kubernetes/models/test_secret.py b/tests/kubernetes/models/test_secret.py
new file mode 100644
index 0000000..843bd79
--- /dev/null
+++ b/tests/kubernetes/models/test_secret.py
@@ -0,0 +1,115 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+from tests.compat import mock
+from kubernetes.client import ApiClient
+import kubernetes.client.models as k8s
+from airflow.kubernetes.secret import Secret
+from airflow.kubernetes.pod_generator import PodGenerator
+from airflow.kubernetes.k8s_model import append_to_pod
+
+
+class TestSecret(unittest.TestCase):
+
+    def test_to_env_secret(self):
+        secret = Secret('env', 'name', 'secret', 'key')
+        self.assertEqual(secret.to_env_secret(), k8s.V1EnvVar(
+            name='NAME',
+            value_from=k8s.V1EnvVarSource(
+                secret_key_ref=k8s.V1SecretKeySelector(
+                    name='secret',
+                    key='key'
+                )
+            )
+        ))
+
+    def test_to_env_from_secret(self):
+        secret = Secret('env', None, 'secret')
+        self.assertEqual(secret.to_env_from_secret(), k8s.V1EnvFromSource(
+            secret_ref=k8s.V1SecretEnvSource(name='secret')
+        ))
+
+    @mock.patch('uuid.uuid4')
+    def test_to_volume_secret(self, mock_uuid):
+        mock_uuid.return_value = '0'
+        secret = Secret('volume', '/etc/foo', 'secret_b')
+        self.assertEqual(secret.to_volume_secret(), (
+            k8s.V1Volume(
+                name='secretvol0',
+                secret=k8s.V1SecretVolumeSource(
+                    secret_name='secret_b'
+                )
+            ),
+            k8s.V1VolumeMount(
+                mount_path='/etc/foo',
+                name='secretvol0',
+                read_only=True
+            )
+        ))
+
+    @mock.patch('uuid.uuid4')
+    def test_attach_to_pod(self, mock_uuid):
+        mock_uuid.return_value = '0'
+        pod = PodGenerator(image='airflow-worker:latest',
+                           name='base').gen_pod()
+        secrets = [
+            # This should be a secretRef
+            Secret('env', None, 'secret_a'),
+            # This should be a single secret mounted in volumeMounts
+            Secret('volume', '/etc/foo', 'secret_b'),
+            # This should produce a single secret mounted in env
+            Secret('env', 'TARGET', 'secret_b', 'source_b'),
+        ]
+        k8s_client = ApiClient()
+        result = append_to_pod(pod, secrets)
+        result = k8s_client.sanitize_for_serialization(result)
+        self.assertEqual(result, {
+            'apiVersion': 'v1',
+            'kind': 'Pod',
+            'metadata': {'name': 'base-0'},
+            'spec': {
+                'containers': [{
+                    'args': [],
+                    'command': [],
+                    'env': [{
+                        'name': 'TARGET',
+                        'valueFrom': {
+                            'secretKeyRef': {
+                                'key': 'source_b',
+                                'name': 'secret_b'
+                            }
+                        }
+                    }],
+                    'envFrom': [{'secretRef': {'name': 'secret_a'}}],
+                    'image': 'airflow-worker:latest',
+                    'imagePullPolicy': 'IfNotPresent',
+                    'name': 'base',
+                    'ports': [],
+                    'volumeMounts': [{
+                        'mountPath': '/etc/foo',
+                        'name': 'secretvol0',
+                        'readOnly': True}]
+                }],
+                'hostNetwork': False,
+                'imagePullSecrets': [],
+                'restartPolicy': 'Never',
+                'volumes': [{
+                    'name': 'secretvol0',
+                    'secret': {'secretName': 'secret_b'}
+                }]
+            }
+        })
diff --git a/tests/kubernetes/test_pod_generator.py b/tests/kubernetes/test_pod_generator.py
new file mode 100644
index 0000000..00ad2bd
--- /dev/null
+++ b/tests/kubernetes/test_pod_generator.py
@@ -0,0 +1,328 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from tests.compat import mock
+import kubernetes.client.models as k8s
+from kubernetes.client import ApiClient
+from airflow.kubernetes.secret import Secret
+from airflow.kubernetes.pod_generator import PodGenerator, PodDefaults
+from airflow.kubernetes.pod import Resources
+from airflow.kubernetes.k8s_model import append_to_pod
+
+
+class TestPodGenerator(unittest.TestCase):
+
+    def setUp(self):
+        self.envs = {
+            'ENVIRONMENT': 'prod',
+            'LOG_LEVEL': 'warning'
+        }
+        self.secrets = [
+            # This should be a secretRef
+            Secret('env', None, 'secret_a'),
+            # This should be a single secret mounted in volumeMounts
+            Secret('volume', '/etc/foo', 'secret_b'),
+            # This should produce a single secret mounted in env
+            Secret('env', 'TARGET', 'secret_b', 'source_b'),
+        ]
+        self.resources = Resources('1Gi', 1, '2Gi', 2, 1)
+        self.k8s_client = ApiClient()
+        self.expected = {
+            'apiVersion': 'v1',
+            'kind': 'Pod',
+            'metadata': {
+                'name': 'myapp-pod-0',
+                'labels': {'app': 'myapp'},
+                'namespace': 'default'
+            },
+            'spec': {
+                'containers': [{
+                    'name': 'base',
+                    'image': 'busybox',
+                    'args': [],
+                    'command': [
+                        'sh', '-c', 'echo Hello Kubernetes!'
+                    ],
+                    'imagePullPolicy': 'IfNotPresent',
+                    'env': [{
+                        'name': 'ENVIRONMENT',
+                        'value': 'prod'
+                    }, {
+                        'name': 'LOG_LEVEL',
+                        'value': 'warning'
+                    }, {
+                        'name': 'TARGET',
+                        'valueFrom': {
+                            'secretKeyRef': {
+                                'name': 'secret_b',
+                                'key': 'source_b'
+                            }
+                        }
+                    }],
+                    'envFrom': [{
+                        'configMapRef': {
+                            'name': 'configmap_a'
+                        }
+                    }, {
+                        'configMapRef': {
+                            'name': 'configmap_b'
+                        }
+                    }, {
+                        'secretRef': {
+                            'name': 'secret_a'
+                        }
+                    }],
+                    'resources': {
+                        'requests': {
+                            'memory': '1Gi',
+                            'cpu': 1
+                        },
+                        'limits': {
+                            'memory': '2Gi',
+                            'cpu': 2,
+                            'nvidia.com/gpu': 1
+                        },
+                    },
+                    'ports': [{'name': 'foo', 'containerPort': 1234}],
+                    'volumeMounts': [{
+                        'mountPath': '/etc/foo',
+                        'name': 'secretvol0',
+                        'readOnly': True
+                    }]
+                }],
+                'restartPolicy': 'Never',
+                'volumes': [{
+                    'name': 'secretvol0',
+                    'secret': {
+                        'secretName': 'secret_b'
+                    }
+                }],
+                'hostNetwork': False,
+                'imagePullSecrets': [
+                    {'name': 'pull_secret_a'},
+                    {'name': 'pull_secret_b'}
+                ],
+                'securityContext': {
+                    'runAsUser': 1000,
+                    'fsGroup': 2000,
+                },
+            }
+        }
+
+    @mock.patch('uuid.uuid4')
+    def test_gen_pod(self, mock_uuid):
+        mock_uuid.return_value = '0'
+        pod_generator = PodGenerator(
+            labels={'app': 'myapp'},
+            name='myapp-pod',
+            image_pull_secrets='pull_secret_a,pull_secret_b',
+            image='busybox',
+            envs=self.envs,
+            cmds=['sh', '-c', 'echo Hello Kubernetes!'],
+            security_context=k8s.V1PodSecurityContext(
+                run_as_user=1000,
+                fs_group=2000,
+            ),
+            namespace='default',
+            ports=[k8s.V1ContainerPort(name='foo', container_port=1234)],
+            configmaps=['configmap_a', 'configmap_b']
+        )
+        result = pod_generator.gen_pod()
+        result = append_to_pod(result, self.secrets)
+        result = self.resources.attach_to_pod(result)
+        result_dict = self.k8s_client.sanitize_for_serialization(result)
+        # sort
+        result_dict['spec']['containers'][0]['env'].sort(key=lambda x: x['name'])
+        result_dict['spec']['containers'][0]['envFrom'].sort(
+            key=lambda x: list(x.values())[0]['name']
+        )
+        self.assertDictEqual(result_dict, self.expected)
+
+    @mock.patch('uuid.uuid4')
+    def test_gen_pod_extract_xcom(self, mock_uuid):
+        mock_uuid.return_value = '0'
+        pod_generator = PodGenerator(
+            labels={'app': 'myapp'},
+            name='myapp-pod',
+            image_pull_secrets='pull_secret_a,pull_secret_b',
+            image='busybox',
+            envs=self.envs,
+            cmds=['sh', '-c', 'echo Hello Kubernetes!'],
+            namespace='default',
+            security_context=k8s.V1PodSecurityContext(
+                run_as_user=1000,
+                fs_group=2000,
+            ),
+            ports=[k8s.V1ContainerPort(name='foo', container_port=1234)],
+            configmaps=['configmap_a', 'configmap_b']
+        )
+        pod_generator.extract_xcom = True
+        result = pod_generator.gen_pod()
+        result = append_to_pod(result, self.secrets)
+        result = self.resources.attach_to_pod(result)
+        result_dict = self.k8s_client.sanitize_for_serialization(result)
+        container_two = {
+            'name': 'airflow-xcom-sidecar',
+            'image': 'python:3.5-alpine',
+            'command': ['python', '-c', PodDefaults.XCOM_CMD],
+            'volumeMounts': [
+                {
+                    'name': 'xcom',
+                    'mountPath': '/airflow/xcom'
+                }
+            ]
+        }
+        self.expected['spec']['containers'].append(container_two)
+        self.expected['spec']['containers'][0]['volumeMounts'].insert(0, {
+            'name': 'xcom',
+            'mountPath': '/airflow/xcom'
+        })
+        self.expected['spec']['volumes'].insert(0, {
+            'name': 'xcom', 'emptyDir': {}
+        })
+        result_dict['spec']['containers'][0]['env'].sort(key=lambda x: x['name'])
+        self.assertEqual(result_dict, self.expected)
+
+    @mock.patch('uuid.uuid4')
+    def test_from_obj(self, mock_uuid):
+        mock_uuid.return_value = '0'
+        result = PodGenerator.from_obj({
+            "KubernetesExecutor": {
+                "annotations": {"test": "annotation"},
+                "volumes": [
+                    {
+                        "name": "example-kubernetes-test-volume",
+                        "hostPath": {"path": "/tmp/"},
+                    },
+                ],
+                "volume_mounts": [
+                    {
+                        "mountPath": "/foo/",
+                        "name": "example-kubernetes-test-volume",
+                    },
+                ],
+                "securityContext": {
+                    "runAsUser": 1000
+                }
+            }
+        })
+        result = self.k8s_client.sanitize_for_serialization(result)
+
+        self.assertEqual({
+            'apiVersion': 'v1',
+            'kind': 'Pod',
+            'metadata': {
+                'annotations': {'test': 'annotation'},
+            },
+            'spec': {
+                'containers': [{
+                    'args': [],
+                    'command': [],
+                    'env': [],
+                    'envFrom': [],
+                    'name': 'base',
+                    'ports': [],
+                    'volumeMounts': [{
+                        'mountPath': '/foo/',
+                        'name': 'example-kubernetes-test-volume'
+                    }],
+                }],
+                'imagePullSecrets': [],
+                'volumes': [{
+                    'hostPath': {'path': '/tmp/'},
+                    'name': 'example-kubernetes-test-volume'
+                }],
+            }
+        }, result)
+
+    def test_reconcile_pods(self):
+        with mock.patch('uuid.uuid4') as mock_uuid:
+            mock_uuid.return_value = '0'
+            base_pod = PodGenerator(
+                image='image1',
+                name='name1',
+                envs={'key1': 'val1'},
+                cmds=['/bin/command1.sh', 'arg1'],
+                ports=k8s.V1ContainerPort(name='port', container_port=2118),
+                volumes=[{
+                    'hostPath': {'path': '/tmp/'},
+                    'name': 'example-kubernetes-test-volume1'
+                }],
+                volume_mounts=[{
+                    'mountPath': '/foo/',
+                    'name': 'example-kubernetes-test-volume1'
+                }],
+            ).gen_pod()
+
+            mutator_pod = PodGenerator(
+                envs={'key2': 'val2'},
+                image='',
+                name='name2',
+                cmds=['/bin/command2.sh', 'arg2'],
+                volumes=[{
+                    'hostPath': {'path': '/tmp/'},
+                    'name': 'example-kubernetes-test-volume2'
+                }],
+                volume_mounts=[{
+                    'mountPath': '/foo/',
+                    'name': 'example-kubernetes-test-volume2'
+                }]
+            ).gen_pod()
+
+            result = PodGenerator.reconcile_pods(base_pod, mutator_pod)
+            result = self.k8s_client.sanitize_for_serialization(result)
+            self.assertEqual(result, {
+                'apiVersion': 'v1',
+                'kind': 'Pod',
+                'metadata': {'name': 'name2-0'},
+                'spec': {
+                    'containers': [{
+                        'args': [],
+                        'command': ['/bin/command1.sh', 'arg1'],
+                        'env': [
+                            {'name': 'key1', 'value': 'val1'},
+                            {'name': 'key2', 'value': 'val2'}
+                        ],
+                        'envFrom': [],
+                        'image': 'image1',
+                        'imagePullPolicy': 'IfNotPresent',
+                        'name': 'base',
+                        'ports': {
+                            'containerPort': 2118,
+                            'name': 'port',
+                        },
+                        'volumeMounts': [{
+                            'mountPath': '/foo/',
+                            'name': 'example-kubernetes-test-volume1'
+                        }, {
+                            'mountPath': '/foo/',
+                            'name': 'example-kubernetes-test-volume2'
+                        }]
+                    }],
+                    'hostNetwork': False,
+                    'imagePullSecrets': [],
+                    'restartPolicy': 'Never',
+                    'volumes': [{
+                        'hostPath': {'path': '/tmp/'},
+                        'name': 'example-kubernetes-test-volume1'
+                    }, {
+                        'hostPath': {'path': '/tmp/'},
+                        'name': 'example-kubernetes-test-volume2'
+                    }]
+                }
+            })
diff --git a/tests/kubernetes/test_pod_launcher.py b/tests/kubernetes/test_pod_launcher.py
index 422d3db..9e0c288 100644
--- a/tests/kubernetes/test_pod_launcher.py
+++ b/tests/kubernetes/test_pod_launcher.py
@@ -30,11 +30,13 @@ class TestPodLauncher(unittest.TestCase):
         self.pod_launcher = PodLauncher(kube_client=self.mock_kube_client)
 
     def test_read_pod_logs_successfully_returns_logs(self):
+        mock.sentinel.metadata = mock.MagicMock()
         self.mock_kube_client.read_namespaced_pod_log.return_value = mock.sentinel.logs
         logs = self.pod_launcher.read_pod_logs(mock.sentinel)
         self.assertEqual(mock.sentinel.logs, logs)
 
     def test_read_pod_logs_retries_successfully(self):
+        mock.sentinel.metadata = mock.MagicMock()
         self.mock_kube_client.read_namespaced_pod_log.side_effect = [
             BaseHTTPError('Boom'),
             mock.sentinel.logs
@@ -46,21 +48,22 @@ class TestPodLauncher(unittest.TestCase):
                 _preload_content=False,
                 container='base',
                 follow=True,
-                name=mock.sentinel.name,
-                namespace=mock.sentinel.namespace,
+                name=mock.sentinel.metadata.name,
+                namespace=mock.sentinel.metadata.namespace,
                 tail_lines=10
             ),
             mock.call(
                 _preload_content=False,
                 container='base',
                 follow=True,
-                name=mock.sentinel.name,
-                namespace=mock.sentinel.namespace,
+                name=mock.sentinel.metadata.name,
+                namespace=mock.sentinel.metadata.namespace,
                 tail_lines=10
             )
         ])
 
     def test_read_pod_logs_retries_fails(self):
+        mock.sentinel.metadata = mock.MagicMock()
         self.mock_kube_client.read_namespaced_pod_log.side_effect = [
             BaseHTTPError('Boom'),
             BaseHTTPError('Boom'),
@@ -73,11 +76,13 @@ class TestPodLauncher(unittest.TestCase):
         )
 
     def test_read_pod_returns_logs(self):
+        mock.sentinel.metadata = mock.MagicMock()
         self.mock_kube_client.read_namespaced_pod.return_value = mock.sentinel.pod_info
         pod_info = self.pod_launcher.read_pod(mock.sentinel)
         self.assertEqual(mock.sentinel.pod_info, pod_info)
 
     def test_read_pod_retries_successfully(self):
+        mock.sentinel.metadata = mock.MagicMock()
         self.mock_kube_client.read_namespaced_pod.side_effect = [
             BaseHTTPError('Boom'),
             mock.sentinel.pod_info
@@ -85,11 +90,12 @@ class TestPodLauncher(unittest.TestCase):
         pod_info = self.pod_launcher.read_pod(mock.sentinel)
         self.assertEqual(mock.sentinel.pod_info, pod_info)
         self.mock_kube_client.read_namespaced_pod.assert_has_calls([
-            mock.call(mock.sentinel.name, mock.sentinel.namespace),
-            mock.call(mock.sentinel.name, mock.sentinel.namespace)
+            mock.call(mock.sentinel.metadata.name, mock.sentinel.metadata.namespace),
+            mock.call(mock.sentinel.metadata.name, mock.sentinel.metadata.namespace)
         ])
 
     def test_read_pod_retries_fails(self):
+        mock.sentinel.metadata = mock.MagicMock()
         self.mock_kube_client.read_namespaced_pod.side_effect = [
             BaseHTTPError('Boom'),
             BaseHTTPError('Boom'),
diff --git a/tests/kubernetes/test_worker_configuration.py b/tests/kubernetes/test_worker_configuration.py
new file mode 100644
index 0000000..1b15d98
--- /dev/null
+++ b/tests/kubernetes/test_worker_configuration.py
@@ -0,0 +1,606 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+import uuid
+from datetime import datetime
+from tests.compat import mock
+from tests.test_utils.config import conf_vars
+try:
+    from airflow.executors.kubernetes_executor import AirflowKubernetesScheduler
+    from airflow.executors.kubernetes_executor import KubeConfig
+    from airflow.kubernetes.worker_configuration import WorkerConfiguration
+    from airflow.kubernetes.pod_generator import PodGenerator
+    from airflow.exceptions import AirflowConfigException
+    from airflow.kubernetes.secret import Secret
+    import kubernetes.client.models as k8s
+    from kubernetes.client.api_client import ApiClient
+except ImportError:
+    AirflowKubernetesScheduler = None  # type: ignore
+
+
+class TestKubernetesWorkerConfiguration(unittest.TestCase):
+    """
+    Tests that if dags_volume_subpath/logs_volume_subpath configuration
+    options are passed to worker pod config
+    """
+
+    affinity_config = {
+        'podAntiAffinity': {
+            'requiredDuringSchedulingIgnoredDuringExecution': [
+                {
+                    'topologyKey': 'kubernetes.io/hostname',
+                    'labelSelector': {
+                        'matchExpressions': [
+                            {
+                                'key': 'app',
+                                'operator': 'In',
+                                'values': ['airflow']
+                            }
+                        ]
+                    }
+                }
+            ]
+        }
+    }
+
+    tolerations_config = [
+        {
+            'key': 'dedicated',
+            'operator': 'Equal',
+            'value': 'airflow'
+        },
+        {
+            'key': 'prod',
+            'operator': 'Exists'
+        }
+    ]
+
+    def setUp(self):
+        if AirflowKubernetesScheduler is None:
+            self.skipTest("kubernetes python package is not installed")
+
+        self.kube_config = mock.MagicMock()
+        self.kube_config.airflow_home = '/'
+        self.kube_config.airflow_dags = 'dags'
+        self.kube_config.airflow_logs = 'logs'
+        self.kube_config.dags_volume_subpath = None
+        self.kube_config.logs_volume_subpath = None
+        self.kube_config.dags_in_image = False
+        self.kube_config.dags_folder = None
+        self.kube_config.git_dags_folder_mount_point = None
+        self.kube_config.kube_labels = {'dag_id': 'original_dag_id', 'my_label': 'label_id'}
+        self.api_client = ApiClient()
+
+    @conf_vars({
+        ('kubernetes', 'git_ssh_known_hosts_configmap_name'): 'airflow-configmap',
+        ('kubernetes', 'git_ssh_key_secret_name'): 'airflow-secrets',
+        ('kubernetes', 'git_user'): 'some-user',
+        ('kubernetes', 'git_password'): 'some-password',
+        ('kubernetes', 'git_repo'): 'git@github.com:apache/airflow.git',
+        ('kubernetes', 'git_branch'): 'master',
+        ('kubernetes', 'git_dags_folder_mount_point'): '/usr/local/airflow/dags',
+        ('kubernetes', 'delete_worker_pods'): 'True',
+        ('kubernetes', 'kube_client_request_args'): '{"_request_timeout" : [60,360]}',
+    })
+    def test_worker_configuration_auth_both_ssh_and_user(self):
+        with self.assertRaisesRegexp(AirflowConfigException,
+                                     'either `git_user` and `git_password`.*'
+                                     'or `git_ssh_key_secret_name`.*'
+                                     'but not both$'):
+            KubeConfig()
+
+    def test_worker_with_subpaths(self):
+        self.kube_config.dags_volume_subpath = 'dags'
+        self.kube_config.logs_volume_subpath = 'logs'
+        self.kube_config.dags_volume_claim = 'dags'
+        self.kube_config.dags_folder = 'dags'
+        worker_config = WorkerConfiguration(self.kube_config)
+        volumes = worker_config._get_volumes()
+        volume_mounts = worker_config._get_volume_mounts()
+
+        for volume in volumes:
+            self.assertNotIn(
+                'subPath', self.api_client.sanitize_for_serialization(volume),
+                "subPath isn't valid configuration for a volume"
+            )
+
+        for volume_mount in volume_mounts:
+            if volume_mount.name != 'airflow-config':
+                self.assertIn(
+                    'subPath', self.api_client.sanitize_for_serialization(volume_mount),
+                    "subPath should've been passed to volumeMount configuration"
+                )
+
+    def test_worker_generate_dag_volume_mount_path(self):
+        self.kube_config.git_dags_folder_mount_point = '/root/airflow/git/dags'
+        self.kube_config.dags_folder = '/root/airflow/dags'
+        worker_config = WorkerConfiguration(self.kube_config)
+
+        self.kube_config.dags_volume_claim = 'airflow-dags'
+        self.kube_config.dags_volume_host = ''
+        dag_volume_mount_path = worker_config.generate_dag_volume_mount_path()
+        self.assertEqual(dag_volume_mount_path, self.kube_config.dags_folder)
+
+        self.kube_config.dags_volume_claim = ''
+        self.kube_config.dags_volume_host = '/host/airflow/dags'
+        dag_volume_mount_path = worker_config.generate_dag_volume_mount_path()
+        self.assertEqual(dag_volume_mount_path, self.kube_config.dags_folder)
+
+        self.kube_config.dags_volume_claim = ''
+        self.kube_config.dags_volume_host = ''
+        dag_volume_mount_path = worker_config.generate_dag_volume_mount_path()
+        self.assertEqual(dag_volume_mount_path,
+                         self.kube_config.git_dags_folder_mount_point)
+
+    def test_worker_environment_no_dags_folder(self):
+        self.kube_config.airflow_configmap = ''
+        self.kube_config.git_dags_folder_mount_point = ''
+        self.kube_config.dags_folder = ''
+        worker_config = WorkerConfiguration(self.kube_config)
+        env = worker_config._get_environment()
+
+        self.assertNotIn('AIRFLOW__CORE__DAGS_FOLDER', env)
+
+    def test_worker_environment_when_dags_folder_specified(self):
+        self.kube_config.airflow_configmap = 'airflow-configmap'
+        self.kube_config.git_dags_folder_mount_point = ''
+        dags_folder = '/workers/path/to/dags'
+        self.kube_config.dags_folder = dags_folder
+
+        worker_config = WorkerConfiguration(self.kube_config)
+        env = worker_config._get_environment()
+
+        self.assertEqual(dags_folder, env['AIRFLOW__CORE__DAGS_FOLDER'])
+
+    def test_worker_environment_dags_folder_using_git_sync(self):
+        self.kube_config.airflow_configmap = 'airflow-configmap'
+        self.kube_config.git_sync_dest = 'repo'
+        self.kube_config.git_subpath = 'dags'
+        self.kube_config.git_dags_folder_mount_point = '/workers/path/to/dags'
+
+        dags_folder = '{}/{}/{}'.format(self.kube_config.git_dags_folder_mount_point,
+                                        self.kube_config.git_sync_dest,
+                                        self.kube_config.git_subpath)
+
+        worker_config = WorkerConfiguration(self.kube_config)
+        env = worker_config._get_environment()
+
+        self.assertEqual(dags_folder, env['AIRFLOW__CORE__DAGS_FOLDER'])
+
+    def test_init_environment_using_git_sync_ssh_without_known_hosts(self):
+        # Tests the init environment created with git-sync SSH authentication option is correct
+        # without known hosts file
+        self.kube_config.airflow_configmap = 'airflow-configmap'
+        self.kube_config.git_ssh_secret_name = 'airflow-secrets'
+        self.kube_config.git_ssh_known_hosts_configmap_name = None
+        self.kube_config.dags_volume_claim = None
+        self.kube_config.dags_volume_host = None
+        self.kube_config.dags_in_image = None
+
+        worker_config = WorkerConfiguration(self.kube_config)
+        init_containers = worker_config._get_init_containers()
+
+        self.assertTrue(init_containers)  # check not empty
+        env = init_containers[0].env
+
+        self.assertIn(k8s.V1EnvVar(name='GIT_SSH_KEY_FILE', value='/etc/git-secret/ssh'), env)
+        self.assertIn(k8s.V1EnvVar(name='GIT_KNOWN_HOSTS', value='false'), env)
+        self.assertIn(k8s.V1EnvVar(name='GIT_SYNC_SSH', value='true'), env)
+
+    def test_init_environment_using_git_sync_ssh_with_known_hosts(self):
+        # Tests the init environment created with git-sync SSH authentication option is correct
+        # with known hosts file
+        self.kube_config.airflow_configmap = 'airflow-configmap'
+        self.kube_config.git_ssh_key_secret_name = 'airflow-secrets'
+        self.kube_config.git_ssh_known_hosts_configmap_name = 'airflow-configmap'
+        self.kube_config.dags_volume_claim = None
+        self.kube_config.dags_volume_host = None
+        self.kube_config.dags_in_image = None
+
+        worker_config = WorkerConfiguration(self.kube_config)
+        init_containers = worker_config._get_init_containers()
+
+        self.assertTrue(init_containers)  # check not empty
+        env = init_containers[0].env
+
+        self.assertIn(k8s.V1EnvVar(name='GIT_SSH_KEY_FILE', value='/etc/git-secret/ssh'), env)
+        self.assertIn(k8s.V1EnvVar(name='GIT_KNOWN_HOSTS', value='true'), env)
+        self.assertIn(k8s.V1EnvVar(
+            name='GIT_SSH_KNOWN_HOSTS_FILE',
+            value='/etc/git-secret/known_hosts'
+        ), env)
+        self.assertIn(k8s.V1EnvVar(name='GIT_SYNC_SSH', value='true'), env)
+
+    def test_init_environment_using_git_sync_user_without_known_hosts(self):
+        # Tests the init environment created with git-sync User authentication option is correct
+        # without known hosts file
+        self.kube_config.airflow_configmap = 'airflow-configmap'
+        self.kube_config.git_user = 'git_user'
+        self.kube_config.git_password = 'git_password'
+        self.kube_config.git_ssh_known_hosts_configmap_name = None
+        self.kube_config.git_ssh_key_secret_name = None
+        self.kube_config.dags_volume_claim = None
+        self.kube_config.dags_volume_host = None
+        self.kube_config.dags_in_image = None
+
+        worker_config = WorkerConfiguration(self.kube_config)
+        init_containers = worker_config._get_init_containers()
+
+        self.assertTrue(init_containers)  # check not empty
+        env = init_containers[0].env
+
+        self.assertNotIn(k8s.V1EnvVar(name='GIT_SSH_KEY_FILE', value='/etc/git-secret/ssh'), env)
+        self.assertIn(k8s.V1EnvVar(name='GIT_SYNC_USERNAME', value='git_user'), env)
+        self.assertIn(k8s.V1EnvVar(name='GIT_SYNC_PASSWORD', value='git_password'), env)
+        self.assertIn(k8s.V1EnvVar(name='GIT_KNOWN_HOSTS', value='false'), env)
+        self.assertNotIn(k8s.V1EnvVar(
+            name='GIT_SSH_KNOWN_HOSTS_FILE',
+            value='/etc/git-secret/known_hosts'
+        ), env)
+        self.assertNotIn(k8s.V1EnvVar(name='GIT_SYNC_SSH', value='true'), env)
+
+    def test_init_environment_using_git_sync_user_with_known_hosts(self):
+        # Tests the init environment created with git-sync User authentication option is correct
+        # with known hosts file
+        self.kube_config.airflow_configmap = 'airflow-configmap'
+        self.kube_config.git_user = 'git_user'
+        self.kube_config.git_password = 'git_password'
+        self.kube_config.git_ssh_known_hosts_configmap_name = 'airflow-configmap'
+        self.kube_config.git_ssh_key_secret_name = None
+        self.kube_config.dags_volume_claim = None
+        self.kube_config.dags_volume_host = None
+        self.kube_config.dags_in_image = None
+
+        worker_config = WorkerConfiguration(self.kube_config)
+        init_containers = worker_config._get_init_containers()
+
+        self.assertTrue(init_containers)  # check not empty
+        env = init_containers[0].env
+
+        self.assertNotIn(k8s.V1EnvVar(name='GIT_SSH_KEY_FILE', value='/etc/git-secret/ssh'), env)
+        self.assertIn(k8s.V1EnvVar(name='GIT_SYNC_USERNAME', value='git_user'), env)
+        self.assertIn(k8s.V1EnvVar(name='GIT_SYNC_PASSWORD', value='git_password'), env)
+        self.assertIn(k8s.V1EnvVar(name='GIT_KNOWN_HOSTS', value='true'), env)
+        self.assertIn(k8s.V1EnvVar(
+            name='GIT_SSH_KNOWN_HOSTS_FILE',
+            value='/etc/git-secret/known_hosts'
+        ), env)
+        self.assertNotIn(k8s.V1EnvVar(name='GIT_SYNC_SSH', value='true'), env)
+
+    def test_init_environment_using_git_sync_run_as_user_empty(self):
+        # Tests if git_syn_run_as_user is none, then no securityContext created in init container
+
+        self.kube_config.dags_volume_claim = None
+        self.kube_config.dags_volume_host = None
+        self.kube_config.dags_in_image = None
+        self.kube_config.git_sync_run_as_user = ''
+
+        worker_config = WorkerConfiguration(self.kube_config)
+        init_containers = worker_config._get_init_containers()
+        self.assertTrue(init_containers)  # check not empty
+
+        self.assertIsNone(init_containers[0].security_context)
+
+    def test_make_pod_run_as_user_0(self):
+        # Tests the pod created with run-as-user 0 actually gets that in it's config
+        self.kube_config.worker_run_as_user = 0
+        self.kube_config.dags_volume_claim = None
+        self.kube_config.dags_volume_host = None
+        self.kube_config.dags_in_image = None
+        self.kube_config.worker_fs_group = None
+        self.kube_config.git_dags_folder_mount_point = 'dags'
+        self.kube_config.git_sync_dest = 'repo'
+        self.kube_config.git_subpath = 'path'
+
+        worker_config = WorkerConfiguration(self.kube_config)
+        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
+                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'")
+
+        self.assertEqual(0, pod.spec.security_context.run_as_user)
+
+    def test_make_pod_git_sync_ssh_without_known_hosts(self):
+        # Tests the pod created with git-sync SSH authentication option is correct without known hosts
+        self.kube_config.airflow_configmap = 'airflow-configmap'
+        self.kube_config.git_ssh_key_secret_name = 'airflow-secrets'
+        self.kube_config.dags_volume_claim = None
+        self.kube_config.dags_volume_host = None
+        self.kube_config.dags_in_image = None
+        self.kube_config.worker_fs_group = None
+        self.kube_config.git_dags_folder_mount_point = 'dags'
+        self.kube_config.git_sync_dest = 'repo'
+        self.kube_config.git_subpath = 'path'
+
+        worker_config = WorkerConfiguration(self.kube_config)
+
+        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
+                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'")
+
+        init_containers = worker_config._get_init_containers()
+        git_ssh_key_file = next((x.value for x in init_containers[0].env
+                                if x.name == 'GIT_SSH_KEY_FILE'), None)
+        volume_mount_ssh_key = next((x.mount_path for x in init_containers[0].volume_mounts
+                                    if x.name == worker_config.git_sync_ssh_secret_volume_name),
+                                    None)
+        self.assertTrue(git_ssh_key_file)
+        self.assertTrue(volume_mount_ssh_key)
+        self.assertEqual(65533, pod.spec.security_context.fs_group)
+        self.assertEqual(git_ssh_key_file,
+                         volume_mount_ssh_key,
+                         'The location where the git ssh secret is mounted'
+                         ' needs to be the same as the GIT_SSH_KEY_FILE path')
+
+    def test_make_pod_git_sync_credentials_secret(self):
+        # Tests the pod created with git_sync_credentials_secret will get into the init container
+        self.kube_config.git_sync_credentials_secret = 'airflow-git-creds-secret'
+        self.kube_config.dags_volume_claim = None
+        self.kube_config.dags_volume_host = None
+        self.kube_config.dags_in_image = None
+        self.kube_config.worker_fs_group = None
+        self.kube_config.git_dags_folder_mount_point = 'dags'
+        self.kube_config.git_sync_dest = 'repo'
+        self.kube_config.git_subpath = 'path'
+
+        worker_config = WorkerConfiguration(self.kube_config)
+
+        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
+                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'")
+
+        username_env = k8s.V1EnvVar(
+            name='GIT_SYNC_USERNAME',
+            value_from=k8s.V1EnvVarSource(
+                secret_key_ref=k8s.V1SecretKeySelector(
+                    name=self.kube_config.git_sync_credentials_secret,
+                    key='GIT_SYNC_USERNAME')
+            )
+        )
+        password_env = k8s.V1EnvVar(
+            name='GIT_SYNC_PASSWORD',
+            value_from=k8s.V1EnvVarSource(
+                secret_key_ref=k8s.V1SecretKeySelector(
+                    name=self.kube_config.git_sync_credentials_secret,
+                    key='GIT_SYNC_PASSWORD')
+            )
+        )
+
+        self.assertIn(username_env, pod.spec.init_containers[0].env,
+                      'The username env for git credentials did not get into the init container')
+
+        self.assertIn(password_env, pod.spec.init_containers[0].env,
+                      'The password env for git credentials did not get into the init container')
+
+    def test_make_pod_git_sync_ssh_with_known_hosts(self):
+        # Tests the pod created with git-sync SSH authentication option is correct with known hosts
+        self.kube_config.airflow_configmap = 'airflow-configmap'
+        self.kube_config.git_ssh_secret_name = 'airflow-secrets'
+        self.kube_config.dags_volume_claim = None
+        self.kube_config.dags_volume_host = None
+        self.kube_config.dags_in_image = None
+
+        worker_config = WorkerConfiguration(self.kube_config)
+
+        init_containers = worker_config._get_init_containers()
+        git_ssh_known_hosts_file = next((x.value for x in init_containers[0].env
+                                         if x.name == 'GIT_SSH_KNOWN_HOSTS_FILE'), None)
+
+        volume_mount_ssh_known_hosts_file = next(
+            (x.mount_path for x in init_containers[0].volume_mounts
+             if x.name == worker_config.git_sync_ssh_known_hosts_volume_name),
+            None)
+        self.assertTrue(git_ssh_known_hosts_file)
+        self.assertTrue(volume_mount_ssh_known_hosts_file)
+        self.assertEqual(git_ssh_known_hosts_file,
+                         volume_mount_ssh_known_hosts_file,
+                         'The location where the git known hosts file is mounted'
+                         ' needs to be the same as the GIT_SSH_KNOWN_HOSTS_FILE path')
+
+    def test_make_pod_with_empty_executor_config(self):
+        self.kube_config.kube_affinity = self.affinity_config
+        self.kube_config.kube_tolerations = self.tolerations_config
+        self.kube_config.dags_folder = 'dags'
+        worker_config = WorkerConfiguration(self.kube_config)
+
+        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
+                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'")
+
+        self.assertTrue(pod.spec.affinity['podAntiAffinity'] is not None)
+        self.assertEqual('app',
+                         pod.spec.affinity['podAntiAffinity']
+                         ['requiredDuringSchedulingIgnoredDuringExecution'][0]
+                         ['labelSelector']
+                         ['matchExpressions'][0]
+                         ['key'])
+
+        self.assertEqual(2, len(pod.spec.tolerations))
+        self.assertEqual('prod', pod.spec.tolerations[1]['key'])
+
+    def test_make_pod_with_executor_config(self):
+        self.kube_config.dags_folder = 'dags'
+        worker_config = WorkerConfiguration(self.kube_config)
+        config_pod = PodGenerator(
+            image='',
+            affinity=self.affinity_config,
+            tolerations=self.tolerations_config,
+        ).gen_pod()
+
+        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
+                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'")
+
+        result = PodGenerator.reconcile_pods(pod, config_pod)
+
+        self.assertTrue(result.spec.affinity['podAntiAffinity'] is not None)
+        self.assertEqual('app',
+                         result.spec.affinity['podAntiAffinity']
+                         ['requiredDuringSchedulingIgnoredDuringExecution'][0]
+                         ['labelSelector']
+                         ['matchExpressions'][0]
+                         ['key'])
+
+        self.assertEqual(2, len(result.spec.tolerations))
+        self.assertEqual('prod', result.spec.tolerations[1]['key'])
+
+    def test_worker_pvc_dags(self):
+        # Tests persistence volume config created when `dags_volume_claim` is set
+        self.kube_config.dags_volume_claim = 'airflow-dags'
+        self.kube_config.dags_folder = 'dags'
+        worker_config = WorkerConfiguration(self.kube_config)
+        volumes = worker_config._get_volumes()
+        volume_mounts = worker_config._get_volume_mounts()
+
+        init_containers = worker_config._get_init_containers()
+
+        dag_volume = [volume for volume in volumes if volume.name == 'airflow-dags']
+        dag_volume_mount = [mount for mount in volume_mounts if mount.name == 'airflow-dags']
+
+        self.assertEqual('airflow-dags', dag_volume[0].persistent_volume_claim.claim_name)
+        self.assertEqual(1, len(dag_volume_mount))
+        self.assertTrue(dag_volume_mount[0].read_only)
+        self.assertEqual(0, len(init_containers))
+
+    def test_worker_git_dags(self):
+        # Tests persistence volume config created when `git_repo` is set
+        self.kube_config.dags_volume_claim = None
+        self.kube_config.dags_volume_host = None
+        self.kube_config.dags_folder = '/usr/local/airflow/dags'
+        self.kube_config.worker_dags_folder = '/usr/local/airflow/dags'
+
+        self.kube_config.git_sync_container_repository = 'gcr.io/google-containers/git-sync-amd64'
+        self.kube_config.git_sync_container_tag = 'v2.0.5'
+        self.kube_config.git_sync_container = 'gcr.io/google-containers/git-sync-amd64:v2.0.5'
+        self.kube_config.git_sync_init_container_name = 'git-sync-clone'
+        self.kube_config.git_subpath = 'dags_folder'
+        self.kube_config.git_sync_root = '/git'
+        self.kube_config.git_sync_run_as_user = 65533
+        self.kube_config.git_dags_folder_mount_point = '/usr/local/airflow/dags/repo/dags_folder'
+
+        worker_config = WorkerConfiguration(self.kube_config)
+        volumes = worker_config._get_volumes()
+        volume_mounts = worker_config._get_volume_mounts()
+
+        dag_volume = [volume for volume in volumes if volume.name == 'airflow-dags']
+        dag_volume_mount = [mount for mount in volume_mounts if mount.name == 'airflow-dags']
+
+        self.assertIsNotNone(dag_volume[0].empty_dir)
+        self.assertEqual(self.kube_config.git_dags_folder_mount_point, dag_volume_mount[0].mount_path)
+        self.assertTrue(dag_volume_mount[0].read_only)
+
+        init_container = worker_config._get_init_containers()[0]
+        init_container_volume_mount = [mount for mount in init_container.volume_mounts
+                                       if mount.name == 'airflow-dags']
+
+        self.assertEqual('git-sync-clone', init_container.name)
+        self.assertEqual('gcr.io/google-containers/git-sync-amd64:v2.0.5', init_container.image)
+        self.assertEqual(1, len(init_container_volume_mount))
+        self.assertFalse(init_container_volume_mount[0].read_only)
+        self.assertEqual(65533, init_container.security_context.run_as_user)
+
+    def test_worker_container_dags(self):
+        # Tests that the 'airflow-dags' persistence volume is NOT created when `dags_in_image` is set
+        self.kube_config.dags_in_image = True
+        self.kube_config.dags_folder = 'dags'
+        worker_config = WorkerConfiguration(self.kube_config)
+        volumes = worker_config._get_volumes()
+        volume_mounts = worker_config._get_volume_mounts()
+
+        dag_volume = [volume for volume in volumes if volume.name == 'airflow-dags']
+        dag_volume_mount = [mount for mount in volume_mounts if mount.name == 'airflow-dags']
+
+        init_containers = worker_config._get_init_containers()
+
+        self.assertEqual(0, len(dag_volume))
+        self.assertEqual(0, len(dag_volume_mount))
+        self.assertEqual(0, len(init_containers))
+
+    def test_kubernetes_environment_variables(self):
+        # Tests the kubernetes environment variables get copied into the worker pods
+        input_environment = {
+            'ENVIRONMENT': 'prod',
+            'LOG_LEVEL': 'warning'
+        }
+        self.kube_config.kube_env_vars = input_environment
+        worker_config = WorkerConfiguration(self.kube_config)
+        env = worker_config._get_environment()
+        for key in input_environment:
+            self.assertIn(key, env)
+            self.assertIn(input_environment[key], env.values())
+
+        core_executor = 'AIRFLOW__CORE__EXECUTOR'
+        input_environment = {
+            core_executor: 'NotLocalExecutor'
+        }
+        self.kube_config.kube_env_vars = input_environment
+        worker_config = WorkerConfiguration(self.kube_config)
+        env = worker_config._get_environment()
+        self.assertEqual(env[core_executor], 'LocalExecutor')
+
+    def test_get_secrets(self):
+        # Test when secretRef is None and kube_secrets is not empty
+        self.kube_config.kube_secrets = {
+            'AWS_SECRET_KEY': 'airflow-secret=aws_secret_key',
+            'POSTGRES_PASSWORD': 'airflow-secret=postgres_credentials'
+        }
+        self.kube_config.env_from_secret_ref = None
+        worker_config = WorkerConfiguration(self.kube_config)
+        secrets = worker_config._get_secrets()
+        secrets.sort(key=lambda secret: secret.deploy_target)
+        expected = [
+            Secret('env', 'AWS_SECRET_KEY', 'airflow-secret', 'aws_secret_key'),
+            Secret('env', 'POSTGRES_PASSWORD', 'airflow-secret', 'postgres_credentials')
+        ]
+        self.assertListEqual(expected, secrets)
+
+        # Test when secret is not empty and kube_secrets is empty dict
+        self.kube_config.kube_secrets = {}
+        self.kube_config.env_from_secret_ref = 'secret_a,secret_b'
+        worker_config = WorkerConfiguration(self.kube_config)
+        secrets = worker_config._get_secrets()
+        expected = [
+            Secret('env', None, 'secret_a'),
+            Secret('env', None, 'secret_b')
+        ]
+        self.assertListEqual(expected, secrets)
+
+    def test_get_env_from(self):
+        # Test when configmap is empty
+        self.kube_config.env_from_configmap_ref = ''
+        worker_config = WorkerConfiguration(self.kube_config)
+        configmaps = worker_config._get_env_from()
+        self.assertListEqual([], configmaps)
+
+        # test when configmap is not empty
+        self.kube_config.env_from_configmap_ref = 'configmap_a,configmap_b'
+        self.kube_config.env_from_secret_ref = 'secretref_a,secretref_b'
+        worker_config = WorkerConfiguration(self.kube_config)
+        configmaps = worker_config._get_env_from()
+        self.assertListEqual([
+            k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name='configmap_a')),
+            k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name='configmap_b')),
+            k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(name='secretref_a')),
+            k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(name='secretref_b'))
+        ], configmaps)
+
+    def test_get_labels(self):
+        worker_config = WorkerConfiguration(self.kube_config)
+        labels = worker_config._get_labels({'my_kube_executor_label': 'kubernetes'}, {
+            'dag_id': 'override_dag_id',
+        })
+        self.assertEqual({
+            'my_label': 'label_id',
+            'dag_id': 'override_dag_id',
+            'my_kube_executor_label': 'kubernetes'
+        }, labels)
diff --git a/tests/runtime/kubernetes/test_kubernetes_pod_operator.py b/tests/runtime/kubernetes/test_kubernetes_pod_operator.py
index 96e06fe..c9bc741 100644
--- a/tests/runtime/kubernetes/test_kubernetes_pod_operator.py
+++ b/tests/runtime/kubernetes/test_kubernetes_pod_operator.py
@@ -19,20 +19,22 @@ import json
 import os
 import shutil
 import unittest
-from tests.compat import mock
 
+import kubernetes.client.models as k8s
 import pytest
 from kubernetes.client.api_client import ApiClient
 from kubernetes.client.rest import ApiException
 
+from airflow.kubernetes.volume import Volume
 from airflow import AirflowException
 from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
 from airflow.kubernetes.pod import Port
+from airflow.kubernetes.pod_generator import PodDefaults
 from airflow.kubernetes.pod_launcher import PodLauncher
 from airflow.kubernetes.secret import Secret
-from airflow.kubernetes.volume import Volume
 from airflow.kubernetes.volume_mount import VolumeMount
 from airflow.version import version as airflow_version
+from tests.compat import mock
 
 
 @pytest.mark.runtime("kubernetes")
@@ -49,7 +51,8 @@ class TestKubernetesPodOperator(unittest.TestCase):
                 'name': mock.ANY,
                 'annotations': {},
                 'labels': {
-                    'foo': 'bar', 'kubernetes_pod_operator': 'True',
+                    'foo': 'bar',
+                    'kubernetes_pod_operator': 'True',
                     'airflow_version': airflow_version.replace('+', '-')
                 }
             },
@@ -77,30 +80,10 @@ class TestKubernetesPodOperator(unittest.TestCase):
             }
         }
 
-    def test_do_xcom_push_defaults_false(self):
-        new_config_path = '/tmp/kube_config'
-        old_config_path = os.path.expanduser('~/.kube/config')
-        shutil.copy(old_config_path, new_config_path)
-
-        k = KubernetesPodOperator(
-            namespace='default',
-            image="ubuntu:16.04",
-            cmds=["bash", "-cx"],
-            arguments=["echo 10"],
-            labels={"foo": "bar"},
-            name="test",
-            task_id="task",
-            in_cluster=False,
-            do_xcom_push=False,
-            config_file=new_config_path,
-        )
-        self.assertFalse(k.do_xcom_push)
-
     def test_config_path_move(self):
         new_config_path = '/tmp/kube_config'
         old_config_path = os.path.expanduser('~/.kube/config')
         shutil.copy(old_config_path, new_config_path)
-
         k = KubernetesPodOperator(
             namespace='default',
             image="ubuntu:16.04",
@@ -114,6 +97,8 @@ class TestKubernetesPodOperator(unittest.TestCase):
             config_file=new_config_path,
         )
         k.execute(None)
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        self.assertEqual(self.expected_pod, actual_pod)
 
     @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
     @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
@@ -163,8 +148,10 @@ class TestKubernetesPodOperator(unittest.TestCase):
         )
         mock_launcher.return_value = (State.SUCCESS, None)
         k.execute(None)
-        self.assertEqual(mock_launcher.call_args[0][0].image_pull_secrets,
-                         fake_pull_secrets)
+        self.assertEqual(
+            mock_launcher.call_args[0][0].spec.image_pull_secrets,
+            [k8s.V1LocalObjectReference(name=fake_pull_secrets)]
+        )
 
     @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
     @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.delete_pod")
@@ -201,6 +188,8 @@ class TestKubernetesPodOperator(unittest.TestCase):
             do_xcom_push=False,
         )
         k.execute(None)
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        self.assertEqual(self.expected_pod, actual_pod)
 
     def test_delete_operator_pod(self):
         k = KubernetesPodOperator(
@@ -216,6 +205,8 @@ class TestKubernetesPodOperator(unittest.TestCase):
             do_xcom_push=False,
         )
         k.execute(None)
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        self.assertEqual(self.expected_pod, actual_pod)
 
     def test_pod_hostnetwork(self):
         k = KubernetesPodOperator(
@@ -231,6 +222,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
             hostnetwork=True,
         )
         k.execute(None)
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        self.expected_pod['spec']['hostNetwork'] = True
+        self.assertEqual(self.expected_pod, actual_pod)
 
     def test_pod_dnspolicy(self):
         dns_policy = "ClusterFirstWithHostNet"
@@ -248,6 +242,10 @@ class TestKubernetesPodOperator(unittest.TestCase):
             dnspolicy=dns_policy,
         )
         k.execute(None)
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        self.expected_pod['spec']['hostNetwork'] = True
+        self.expected_pod['spec']['dnsPolicy'] = dns_policy
+        self.assertEqual(self.expected_pod, actual_pod)
 
     def test_pod_node_selectors(self):
         node_selectors = {
@@ -266,6 +264,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
             node_selectors=node_selectors,
         )
         k.execute(None)
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        self.expected_pod['spec']['nodeSelector'] = node_selectors
+        self.assertEqual(self.expected_pod, actual_pod)
 
     def test_pod_resources(self):
         resources = {
@@ -287,6 +288,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
             resources=resources,
         )
         k.execute(None)
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        self.expected_pod['spec']['containers'][0]['resources'] = resources
+        self.assertEqual(self.expected_pod, actual_pod)
 
     def test_pod_affinity(self):
         affinity = {
@@ -319,6 +323,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
             affinity=affinity,
         )
         k.execute(None)
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        self.expected_pod['spec']['affinity'] = affinity
+        self.assertEqual(self.expected_pod, actual_pod)
 
     def test_port(self):
         port = Port('http', 80)
@@ -336,6 +343,12 @@ class TestKubernetesPodOperator(unittest.TestCase):
             ports=[port],
         )
         k.execute(None)
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        self.expected_pod['spec']['containers'][0]['ports'] = [{
+            'name': 'http',
+            'containerPort': 80
+        }]
+        self.assertEqual(self.expected_pod, actual_pod)
 
     def test_volume_mount(self):
         with mock.patch.object(PodLauncher, 'log') as mock_logger:
@@ -368,6 +381,20 @@ class TestKubernetesPodOperator(unittest.TestCase):
             )
             k.execute(None)
             mock_logger.info.assert_any_call(b"retrieved from mount\n")
+            actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+            self.expected_pod['spec']['containers'][0]['args'] = args
+            self.expected_pod['spec']['containers'][0]['volumeMounts'] = [{
+                'name': 'test-volume',
+                'mountPath': '/tmp/test_volume',
+                'readOnly': False
+            }]
+            self.expected_pod['spec']['volumes'] = [{
+                'name': 'test-volume',
+                'persistentVolumeClaim': {
+                    'claimName': 'test-volume'
+                }
+            }]
+            self.assertEqual(self.expected_pod, actual_pod)
 
     def test_run_as_user_root(self):
         security_context = {
@@ -388,6 +415,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
             security_context=security_context,
         )
         k.execute(None)
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        self.expected_pod['spec']['securityContext'] = security_context
+        self.assertEqual(self.expected_pod, actual_pod)
 
     def test_run_as_user_non_root(self):
         security_context = {
@@ -409,6 +439,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
             security_context=security_context,
         )
         k.execute(None)
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        self.expected_pod['spec']['securityContext'] = security_context
+        self.assertEqual(self.expected_pod, actual_pod)
 
     def test_fs_group(self):
         security_context = {
@@ -430,6 +463,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
             security_context=security_context,
         )
         k.execute(None)
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        self.expected_pod['spec']['securityContext'] = security_context
+        self.assertEqual(self.expected_pod, actual_pod)
 
     def test_faulty_image(self):
         bad_image_name = "foobar"
@@ -447,6 +483,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
         )
         with self.assertRaises(AirflowException):
             k.execute(None)
+            actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+            self.expected_pod['spec']['containers'][0]['image'] = bad_image_name
+            self.assertEqual(self.expected_pod, actual_pod)
 
     def test_faulty_service_account(self):
         bad_service_account_name = "foobar"
@@ -465,6 +504,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
         )
         with self.assertRaises(ApiException):
             k.execute(None)
+            actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+            self.expected_pod['spec']['serviceAccountName'] = bad_service_account_name
+            self.assertEqual(self.expected_pod, actual_pod)
 
     def test_pod_failure(self):
         """
@@ -484,6 +526,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
         )
         with self.assertRaises(AirflowException):
             k.execute(None)
+            actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+            self.expected_pod['spec']['containers'][0]['args'] = bad_internal_command
+            self.assertEqual(self.expected_pod, actual_pod)
 
     def test_xcom_push(self):
         return_value = '{"foo": "bar"\n, "buzz": 2}'
@@ -500,13 +545,23 @@ class TestKubernetesPodOperator(unittest.TestCase):
             do_xcom_push=True,
         )
         self.assertEqual(k.execute(None), json.loads(return_value))
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        volume = self.api_client.sanitize_for_serialization(PodDefaults.VOLUME)
+        volume_mount = self.api_client.sanitize_for_serialization(PodDefaults.VOLUME_MOUNT)
+        container = self.api_client.sanitize_for_serialization(PodDefaults.SIDECAR_CONTAINER)
+        self.expected_pod['spec']['containers'][0]['args'] = args
+        self.expected_pod['spec']['containers'][0]['volumeMounts'].insert(0, volume_mount)
+        self.expected_pod['spec']['volumes'].insert(0, volume)
+        self.expected_pod['spec']['containers'].append(container)
+        self.assertEqual(self.expected_pod, actual_pod)
 
     @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
     @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
     def test_envs_from_configmaps(self, mock_client, mock_launcher):
         # GIVEN
         from airflow.utils.state import State
-        configmaps = ['test-configmap']
+
+        configmap = 'test-configmap'
         # WHEN
         k = KubernetesPodOperator(
             namespace='default',
@@ -518,12 +573,17 @@ class TestKubernetesPodOperator(unittest.TestCase):
             task_id="task",
             in_cluster=False,
             do_xcom_push=False,
-            configmaps=configmaps
+            configmaps=[configmap]
         )
         # THEN
         mock_launcher.return_value = (State.SUCCESS, None)
         k.execute(None)
-        self.assertEqual(mock_launcher.call_args[0][0].configmaps, configmaps)
+        self.assertEqual(
+            mock_launcher.call_args[0][0].spec.containers[0].env_from,
+            [k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(
+                name=configmap
+            ))]
+        )
 
     @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
     @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
@@ -548,7 +608,12 @@ class TestKubernetesPodOperator(unittest.TestCase):
         # THEN
         mock_launcher.return_value = (State.SUCCESS, None)
         k.execute(None)
-        self.assertEqual(mock_launcher.call_args[0][0].secrets, secrets)
+        self.assertEqual(
+            mock_launcher.call_args[0][0].spec.containers[0].env_from,
+            [k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(
+                name=secret_ref
+            ))]
+        )
 
 
 # pylint: enable=unused-argument
diff --git a/tests/test_local_settings.py b/tests/test_local_settings.py
index fdc6cb9..3497ee2 100644
--- a/tests/test_local_settings.py
+++ b/tests/test_local_settings.py
@@ -23,8 +23,6 @@ import tempfile
 import unittest
 from tests.compat import MagicMock, Mock, call, patch
 
-from airflow.kubernetes.pod import Pod
-
 
 SETTINGS_FILE_POLICY = """
 def test_policy(task_instance):
@@ -149,7 +147,7 @@ class LocalSettingsTest(unittest.TestCase):
             from airflow import settings
             settings.import_local_settings()  # pylint: ignore
 
-            pod = Pod(image="ubuntu", envs={}, cmds=['echo "1"'])
+            pod = MagicMock()
             settings.pod_mutation_hook(pod)
 
             assert pod.namespace == 'airflow-tests'