You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by di...@apache.org on 2020/06/02 00:59:06 UTC

[airflow] branch v1-10-test updated (e676e59 -> aaaa366)

This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a change to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    omit e676e59  [AIRFLOW-4851] Refactor K8S codebase with k8s API models (#5481)
     new aaaa366  [AIRFLOW-4851] Refactor K8S codebase with k8s API models (#5481)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (e676e59)
            \
             N -- N -- N   refs/heads/v1-10-test (aaaa366)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 scripts/ci/in_container/kubernetes/app/secrets.yaml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


[airflow] 01/01: [AIRFLOW-4851] Refactor K8S codebase with k8s API models (#5481)

Posted by di...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit aaaa366527bf95801acaef7f01107a3cdd5ef938
Author: davlum <da...@gmail.com>
AuthorDate: Mon Jun 1 17:57:44 2020 -0700

    [AIRFLOW-4851] Refactor K8S codebase with k8s API models (#5481)
    
    * [AIRLFOW-4851] refactor Airflow kubernetes
    
    * [AIRFLOW-4851] refactor Airflow k8s models
    
    * [AIRFLOW-4851] Fix linting and tests
    * Refactor and add some tests
    
    * [AIRLFOW-4851] Add assertions to PodOperator tests
    
    Author: davlum <da...@gmail.com>
    Date:   Wed Sep 4 17:24:31 2019 -0400
    
    (cherry picked from commit 17d4179db2d4424b2bbd4a7f9546a0ca3628cac2)
---
 .../contrib/operators/kubernetes_pod_operator.py   | 194 +++--
 airflow/executors/kubernetes_executor.py           | 102 +--
 airflow/kubernetes/k8s_model.py                    |  61 ++
 .../kubernetes_request_factory/__init__.py         |  16 -
 .../kubernetes_request_factory.py                  | 258 ------
 .../pod_request_factory.py                         | 139 ----
 airflow/kubernetes/pod.py                          | 124 +--
 airflow/kubernetes/pod_generator.py                | 470 +++++++----
 airflow/kubernetes/pod_launcher.py                 |  58 +-
 airflow/kubernetes/pod_runtime_info_env.py         |  28 +-
 airflow/kubernetes/secret.py                       |  63 +-
 airflow/kubernetes/volume.py                       |  20 +-
 airflow/kubernetes/volume_mount.py                 |  24 +-
 airflow/kubernetes/worker_configuration.py         | 516 ++++++------
 airflow/settings.py                                |   5 +-
 .../ci/in_container/kubernetes/app/deploy_app.sh   |   4 +
 .../ci/in_container/kubernetes/app/secrets.yaml    |   4 +-
 .../ci/in_container/kubernetes/app/volumes.yaml    |   4 +-
 tests/executors/test_kubernetes_executor.py        | 882 +--------------------
 .../test_kubernetes_request_factory.py             | 396 ---------
 .../test_pod_request_factory.py                    | 175 ----
 .../__init__.py                                    |   0
 tests/kubernetes/models/test_pod.py                |  76 ++
 tests/kubernetes/models/test_secret.py             | 115 +++
 tests/kubernetes/test_pod_generator.py             | 328 ++++++++
 tests/kubernetes/test_pod_launcher.py              |  18 +-
 tests/kubernetes/test_worker_configuration.py      | 606 ++++++++++++++
 .../kubernetes/test_kubernetes_pod_operator.py     | 123 ++-
 tests/test_local_settings.py                       |   4 +-
 29 files changed, 2195 insertions(+), 2618 deletions(-)

diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index d121510..bacf2ed 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -15,15 +15,13 @@
 # specific language governing permissions and limitations
 # under the License.
 """Executes task in a Kubernetes POD"""
-import re
 import warnings
 
 from airflow.exceptions import AirflowException
+from airflow.kubernetes import pod_generator, kube_client, pod_launcher
+from airflow.kubernetes.k8s_model import append_to_pod
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
-from airflow.kubernetes import kube_client, pod_generator, pod_launcher
-from airflow.kubernetes.pod import Resources
-from airflow.utils.helpers import validate_key
 from airflow.utils.state import State
 from airflow.version import version as airflow_version
 
@@ -49,13 +47,13 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
                                If more than one secret is required, provide a
                                comma separated list: secret_a,secret_b
     :type image_pull_secrets: str
-    :param ports: ports for launched pod.
-    :type ports: list[airflow.kubernetes.pod.Port]
-    :param volume_mounts: volumeMounts for launched pod.
-    :type volume_mounts: list[airflow.kubernetes.volume_mount.VolumeMount]
-    :param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes.
-    :type volumes: list[airflow.kubernetes.volume.Volume]
-    :param labels: labels to apply to the Pod.
+    :param ports: ports for launched pod
+    :type ports: list[airflow.kubernetes.models.port.Port]
+    :param volume_mounts: volumeMounts for launched pod
+    :type volume_mounts: list[airflow.kubernetes.models.volume_mount.VolumeMount]
+    :param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes
+    :type volumes: list[airflow.kubernetes.models.volume.Volume]
+    :param labels: labels to apply to the Pod
     :type labels: dict
     :param startup_timeout_seconds: timeout in seconds to startup the pod.
     :type startup_timeout_seconds: int
@@ -64,10 +62,10 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
     :type name: str
     :param env_vars: Environment variables initialized in the container. (templated)
     :type env_vars: dict
-    :param secrets: Kubernetes secrets to inject in the container.
-        They can be exposed as environment vars or files in a volume
-    :type secrets: list[airflow.kubernetes.secret.Secret]
-    :param in_cluster: run kubernetes client with in_cluster configuration.
+    :param secrets: Kubernetes secrets to inject in the container,
+        They can be exposed as environment vars or files in a volume.
+    :type secrets: list[airflow.kubernetes.models.secret.Secret]
+    :param in_cluster: run kubernetes client with in_cluster configuration
     :type in_cluster: bool
     :param cluster_context: context that points to kubernetes cluster.
         Ignored when in_cluster is True. If None, current-context is used.
@@ -105,15 +103,82 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
         want mount as env variables.
     :type configmaps: list[str]
     :param pod_runtime_info_envs: environment variables about
-                                  pod runtime information (ip, namespace, nodeName, podName).
-    :type pod_runtime_info_envs: list[PodRuntimeEnv]
-    :param security_context: security options the pod should run with (PodSecurityContext).
-    :type security_context: dict
-    :param dnspolicy: dnspolicy for the pod.
+                                  pod runtime information (ip, namespace, nodeName, podName)
+    :type pod_runtime_info_envs: list[airflow.kubernetes.models.pod_runtime_info_env.PodRuntimeInfoEnv]
+    :param dnspolicy: Specify a dnspolicy for the pod
     :type dnspolicy: str
+    :param full_pod_spec: The complete podSpec
+    :type full_pod_spec: kubernetes.client.models.V1Pod
     """
     template_fields = ('cmds', 'arguments', 'env_vars', 'config_file')
 
+    def execute(self, context):
+        try:
+            client = kube_client.get_kube_client(in_cluster=self.in_cluster,
+                                                 cluster_context=self.cluster_context,
+                                                 config_file=self.config_file)
+            # Add Airflow Version to the label
+            # And a label to identify that pod is launched by KubernetesPodOperator
+            self.labels.update(
+                {
+                    'airflow_version': airflow_version.replace('+', '-'),
+                    'kubernetes_pod_operator': 'True',
+                }
+            )
+
+            pod = pod_generator.PodGenerator(
+                image=self.image,
+                namespace=self.namespace,
+                cmds=self.cmds,
+                args=self.arguments,
+                labels=self.labels,
+                name=self.name,
+                envs=self.env_vars,
+                extract_xcom=self.do_xcom_push,
+                image_pull_policy=self.image_pull_policy,
+                node_selectors=self.node_selectors,
+                annotations=self.annotations,
+                affinity=self.affinity,
+                image_pull_secrets=self.image_pull_secrets,
+                service_account_name=self.service_account_name,
+                hostnetwork=self.hostnetwork,
+                tolerations=self.tolerations,
+                configmaps=self.configmaps,
+                security_context=self.security_context,
+                dnspolicy=self.dnspolicy,
+                resources=self.resources,
+                pod=self.full_pod_spec,
+            ).gen_pod()
+
+            pod = append_to_pod(pod, self.ports)
+            pod = append_to_pod(pod, self.pod_runtime_info_envs)
+            pod = append_to_pod(pod, self.volumes)
+            pod = append_to_pod(pod, self.volume_mounts)
+            pod = append_to_pod(pod, self.secrets)
+
+            self.pod = pod
+
+            launcher = pod_launcher.PodLauncher(kube_client=client,
+                                                extract_xcom=self.do_xcom_push)
+
+            try:
+                (final_state, result) = launcher.run_pod(
+                    pod,
+                    startup_timeout=self.startup_timeout_seconds,
+                    get_logs=self.get_logs)
+            finally:
+                if self.is_delete_operator_pod:
+                    launcher.delete_pod(pod)
+
+            if final_state != State.SUCCESS:
+                raise AirflowException(
+                    'Pod returned a failure: {state}'.format(state=final_state)
+                )
+
+            return result
+        except AirflowException as ex:
+            raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
+
     @apply_defaults
     def __init__(self,  # pylint: disable=too-many-arguments,too-many-locals
                  namespace,
@@ -136,6 +201,7 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
                  resources=None,
                  affinity=None,
                  config_file=None,
+                 do_xcom_push=False,
                  node_selectors=None,
                  image_pull_secrets=None,
                  service_account_name='default',
@@ -146,7 +212,7 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
                  security_context=None,
                  pod_runtime_info_envs=None,
                  dnspolicy=None,
-                 do_xcom_push=False,
+                 full_pod_spec=None,
                  *args,
                  **kwargs):
         # https://github.com/apache/airflow/blob/2d0eff4ee4fafcf8c7978ac287a8fb968e56605f/UPDATING.md#unification-of-do_xcom_push-flag
@@ -157,6 +223,7 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
                 DeprecationWarning, stacklevel=2
             )
         super(KubernetesPodOperator, self).__init__(*args, resources=None, **kwargs)
+        self.pod = None
         self.do_xcom_push = do_xcom_push
         self.image = image
         self.namespace = namespace
@@ -164,7 +231,7 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
         self.arguments = arguments or []
         self.labels = labels or {}
         self.startup_timeout_seconds = startup_timeout_seconds
-        self.name = self._set_name(name)
+        self.name = name
         self.env_vars = env_vars or {}
         self.ports = ports or []
         self.volume_mounts = volume_mounts or []
@@ -177,7 +244,7 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
         self.node_selectors = node_selectors or {}
         self.annotations = annotations or {}
         self.affinity = affinity or {}
-        self.resources = self._set_resources(resources)
+        self.resources = resources
         self.config_file = config_file
         self.image_pull_secrets = image_pull_secrets
         self.service_account_name = service_account_name
@@ -188,83 +255,4 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
         self.security_context = security_context or {}
         self.pod_runtime_info_envs = pod_runtime_info_envs or []
         self.dnspolicy = dnspolicy
-
-    def execute(self, context):
-        try:
-            if self.in_cluster is not None:
-                client = kube_client.get_kube_client(in_cluster=self.in_cluster,
-                                                     cluster_context=self.cluster_context,
-                                                     config_file=self.config_file)
-            else:
-                client = kube_client.get_kube_client(cluster_context=self.cluster_context,
-                                                     config_file=self.config_file)
-
-            # Add Airflow Version to the label
-            # And a label to identify that pod is launched by KubernetesPodOperator
-            self.labels.update(
-                {
-                    'airflow_version': airflow_version.replace('+', '-'),
-                    'kubernetes_pod_operator': 'True',
-                }
-            )
-
-            gen = pod_generator.PodGenerator()
-
-            for port in self.ports:
-                gen.add_port(port)
-            for mount in self.volume_mounts:
-                gen.add_mount(mount)
-            for volume in self.volumes:
-                gen.add_volume(volume)
-
-            pod = gen.make_pod(
-                namespace=self.namespace,
-                image=self.image,
-                pod_id=self.name,
-                cmds=self.cmds,
-                arguments=self.arguments,
-                labels=self.labels,
-            )
-
-            pod.service_account_name = self.service_account_name
-            pod.secrets = self.secrets
-            pod.envs = self.env_vars
-            pod.image_pull_policy = self.image_pull_policy
-            pod.image_pull_secrets = self.image_pull_secrets
-            pod.annotations = self.annotations
-            pod.resources = self.resources
-            pod.affinity = self.affinity
-            pod.node_selectors = self.node_selectors
-            pod.hostnetwork = self.hostnetwork
-            pod.tolerations = self.tolerations
-            pod.configmaps = self.configmaps
-            pod.security_context = self.security_context
-            pod.pod_runtime_info_envs = self.pod_runtime_info_envs
-            pod.dnspolicy = self.dnspolicy
-
-            launcher = pod_launcher.PodLauncher(kube_client=client,
-                                                extract_xcom=self.do_xcom_push)
-            try:
-                (final_state, result) = launcher.run_pod(
-                    pod,
-                    startup_timeout=self.startup_timeout_seconds,
-                    get_logs=self.get_logs)
-            finally:
-                if self.is_delete_operator_pod:
-                    launcher.delete_pod(pod)
-
-            if final_state != State.SUCCESS:
-                raise AirflowException(
-                    'Pod returned a failure: {state}'.format(state=final_state)
-                )
-            if self.do_xcom_push:
-                return result
-        except AirflowException as ex:
-            raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
-
-    def _set_resources(self, resources):
-        return Resources(**resources) if resources else Resources()
-
-    def _set_name(self, name):
-        validate_key(name, max_length=63)
-        return re.sub(r'[^a-z0-9.-]+', '-', name.lower())
+        self.full_pod_spec = full_pod_spec
diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index 32f4fd5..6944274 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -36,8 +36,8 @@ from airflow.configuration import conf
 from airflow.kubernetes.pod_launcher import PodLauncher
 from airflow.kubernetes.kube_client import get_kube_client
 from airflow.kubernetes.worker_configuration import WorkerConfiguration
+from airflow.kubernetes.pod_generator import PodGenerator
 from airflow.executors.base_executor import BaseExecutor
-from airflow.executors import Executors
 from airflow.models import KubeResourceVersion, KubeWorkerIdentifier, TaskInstance
 from airflow.utils.state import State
 from airflow.utils.db import provide_session, create_session
@@ -49,87 +49,6 @@ MAX_POD_ID_LEN = 253
 MAX_LABEL_LEN = 63
 
 
-class KubernetesExecutorConfig:
-    def __init__(self, image=None, image_pull_policy=None, request_memory=None,
-                 request_cpu=None, limit_memory=None, limit_cpu=None, limit_gpu=None,
-                 gcp_service_account_key=None, node_selectors=None, affinity=None,
-                 annotations=None, volumes=None, volume_mounts=None, tolerations=None, labels=None):
-        self.image = image
-        self.image_pull_policy = image_pull_policy
-        self.request_memory = request_memory
-        self.request_cpu = request_cpu
-        self.limit_memory = limit_memory
-        self.limit_cpu = limit_cpu
-        self.limit_gpu = limit_gpu
-        self.gcp_service_account_key = gcp_service_account_key
-        self.node_selectors = node_selectors
-        self.affinity = affinity
-        self.annotations = annotations
-        self.volumes = volumes
-        self.volume_mounts = volume_mounts
-        self.tolerations = tolerations
-        self.labels = labels or {}
-
-    def __repr__(self):
-        return "{}(image={}, image_pull_policy={}, request_memory={}, request_cpu={}, " \
-               "limit_memory={}, limit_cpu={}, limit_gpu={}, gcp_service_account_key={}, " \
-               "node_selectors={}, affinity={}, annotations={}, volumes={}, " \
-               "volume_mounts={}, tolerations={}, labels={})" \
-            .format(KubernetesExecutorConfig.__name__, self.image, self.image_pull_policy,
-                    self.request_memory, self.request_cpu, self.limit_memory,
-                    self.limit_cpu, self.limit_gpu, self.gcp_service_account_key, self.node_selectors,
-                    self.affinity, self.annotations, self.volumes, self.volume_mounts,
-                    self.tolerations, self.labels)
-
-    @staticmethod
-    def from_dict(obj):
-        if obj is None:
-            return KubernetesExecutorConfig()
-
-        if not isinstance(obj, dict):
-            raise TypeError(
-                'Cannot convert a non-dictionary object into a KubernetesExecutorConfig')
-
-        namespaced = obj.get(Executors.KubernetesExecutor, {})
-
-        return KubernetesExecutorConfig(
-            image=namespaced.get('image', None),
-            image_pull_policy=namespaced.get('image_pull_policy', None),
-            request_memory=namespaced.get('request_memory', None),
-            request_cpu=namespaced.get('request_cpu', None),
-            limit_memory=namespaced.get('limit_memory', None),
-            limit_cpu=namespaced.get('limit_cpu', None),
-            limit_gpu=namespaced.get('limit_gpu', None),
-            gcp_service_account_key=namespaced.get('gcp_service_account_key', None),
-            node_selectors=namespaced.get('node_selectors', None),
-            affinity=namespaced.get('affinity', None),
-            annotations=namespaced.get('annotations', {}),
-            volumes=namespaced.get('volumes', []),
-            volume_mounts=namespaced.get('volume_mounts', []),
-            tolerations=namespaced.get('tolerations', None),
-            labels=namespaced.get('labels', {}),
-        )
-
-    def as_dict(self):
-        return {
-            'image': self.image,
-            'image_pull_policy': self.image_pull_policy,
-            'request_memory': self.request_memory,
-            'request_cpu': self.request_cpu,
-            'limit_memory': self.limit_memory,
-            'limit_cpu': self.limit_cpu,
-            'limit_gpu': self.limit_gpu,
-            'gcp_service_account_key': self.gcp_service_account_key,
-            'node_selectors': self.node_selectors,
-            'affinity': self.affinity,
-            'annotations': self.annotations,
-            'volumes': self.volumes,
-            'volume_mounts': self.volume_mounts,
-            'tolerations': self.tolerations,
-            'labels': self.labels,
-        }
-
-
 class KubeConfig:
     """Configuration for Kubernetes"""
     core_section = 'core'
@@ -473,17 +392,23 @@ class AirflowKubernetesScheduler(LoggingMixin):
         self.log.info('Kubernetes job is %s', str(next_job))
         key, command, kube_executor_config = next_job
         dag_id, task_id, execution_date, try_number = key
-        self.log.debug("Kubernetes running for command %s", command)
-        self.log.debug("Kubernetes launching image %s", self.kube_config.kube_image)
-        pod = self.worker_configuration.make_pod(
-            namespace=self.namespace, worker_uuid=self.worker_uuid,
+
+        config_pod = self.worker_configuration.make_pod(
+            namespace=self.namespace,
+            worker_uuid=self.worker_uuid,
             pod_id=self._create_pod_id(dag_id, task_id),
             dag_id=self._make_safe_label_value(dag_id),
             task_id=self._make_safe_label_value(task_id),
             try_number=try_number,
             execution_date=self._datetime_to_label_safe_datestring(execution_date),
-            airflow_command=command, kube_executor_config=kube_executor_config
+            airflow_command=command
         )
+        # Reconcile the pod generated by the Operator and the Pod
+        # generated by the .cfg file
+        pod = PodGenerator.reconcile_pods(config_pod, kube_executor_config)
+        self.log.debug("Kubernetes running for command %s", command)
+        self.log.debug("Kubernetes launching image %s", pod.spec.containers[0].image)
+
         # the watcher will monitor pods, so we do not block.
         self.launcher.run_pod_async(pod, **self.kube_config.kube_client_request_args)
         self.log.debug("Kubernetes Job created!")
@@ -841,7 +766,8 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
             'Add task %s with command %s with executor_config %s',
             key, command, executor_config
         )
-        kube_executor_config = KubernetesExecutorConfig.from_dict(executor_config)
+
+        kube_executor_config = PodGenerator.from_obj(executor_config)
         self.task_queue.put((key, command, kube_executor_config))
 
     def sync(self):
diff --git a/airflow/kubernetes/k8s_model.py b/airflow/kubernetes/k8s_model.py
new file mode 100644
index 0000000..7049b1d
--- /dev/null
+++ b/airflow/kubernetes/k8s_model.py
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Classes for interacting with Kubernetes API
+"""
+
+import abc
+import sys
+from functools import reduce
+
+if sys.version_info >= (3, 4):
+    ABC = abc.ABC
+else:
+    ABC = abc.ABCMeta('ABC', (), {})
+
+
+class K8SModel(ABC):
+    """
+    These Airflow Kubernetes models are here for backwards compatibility
+    reasons only. Ideally clients should use the kubernetes api
+    and the process of
+
+        client input -> Airflow k8s models -> k8s models
+
+    can be avoided. All of these models implement the
+    `attach_to_pod` method so that they integrate with the kubernetes client.
+    """
+    @abc.abstractmethod
+    def attach_to_pod(self, pod):
+        """
+        :param pod: A pod to attach this Kubernetes object to
+        :type pod: kubernetes.client.models.V1Pod
+        :return: The pod with the object attached
+        """
+
+
+def append_to_pod(pod, k8s_objects):
+    """
+    :param pod: A pod to attach a list of Kubernetes objects to
+    :type pod: kubernetes.client.models.V1Pod
+    :param k8s_objects: a potential None list of K8SModels
+    :type k8s_objects: Optional[List[K8SModel]]
+    :return: pod with the objects attached if they exist
+    """
+    if not k8s_objects:
+        return pod
+    return reduce(lambda p, o: o.attach_to_pod(p), k8s_objects, pod)
diff --git a/airflow/kubernetes/kubernetes_request_factory/__init__.py b/airflow/kubernetes/kubernetes_request_factory/__init__.py
deleted file mode 100644
index 13a8339..0000000
--- a/airflow/kubernetes/kubernetes_request_factory/__init__.py
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
diff --git a/airflow/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
deleted file mode 100644
index ad58510..0000000
--- a/airflow/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++ /dev/null
@@ -1,258 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from abc import ABCMeta, abstractmethod
-import six
-
-
-class KubernetesRequestFactory:
-    """
-    Create requests to be sent to kube API.
-    Extend this class to talk to kubernetes and generate your specific resources.
-    This is equivalent of generating yaml files that can be used by `kubectl`
-    """
-    __metaclass__ = ABCMeta
-
-    @abstractmethod
-    def create(self, pod):
-        """
-        Creates the request for kubernetes API.
-
-        :param pod: The pod object
-        """
-
-    @staticmethod
-    def extract_image(pod, req):
-        req['spec']['containers'][0]['image'] = pod.image
-
-    @staticmethod
-    def extract_image_pull_policy(pod, req):
-        if pod.image_pull_policy:
-            req['spec']['containers'][0]['imagePullPolicy'] = pod.image_pull_policy
-
-    @staticmethod
-    def add_secret_to_env(env, secret):
-        env.append({
-            'name': secret.deploy_target,
-            'valueFrom': {
-                'secretKeyRef': {
-                    'name': secret.secret,
-                    'key': secret.key
-                }
-            }
-        })
-
-    @staticmethod
-    def add_runtime_info_env(env, runtime_info):
-        env.append({
-            'name': runtime_info.name,
-            'valueFrom': {
-                'fieldRef': {
-                    'fieldPath': runtime_info.field_path
-                }
-            }
-        })
-
-    @staticmethod
-    def extract_labels(pod, req):
-        req['metadata']['labels'] = req['metadata'].get('labels', {})
-        for k, v in six.iteritems(pod.labels):
-            req['metadata']['labels'][k] = v
-
-    @staticmethod
-    def extract_annotations(pod, req):
-        req['metadata']['annotations'] = req['metadata'].get('annotations', {})
-        for k, v in six.iteritems(pod.annotations):
-            req['metadata']['annotations'][k] = v
-
-    @staticmethod
-    def extract_affinity(pod, req):
-        req['spec']['affinity'] = req['spec'].get('affinity', {})
-        for k, v in six.iteritems(pod.affinity):
-            req['spec']['affinity'][k] = v
-
-    @staticmethod
-    def extract_node_selector(pod, req):
-        req['spec']['nodeSelector'] = req['spec'].get('nodeSelector', {})
-        for k, v in six.iteritems(pod.node_selectors):
-            req['spec']['nodeSelector'][k] = v
-
-    @staticmethod
-    def extract_cmds(pod, req):
-        req['spec']['containers'][0]['command'] = pod.cmds
-
-    @staticmethod
-    def extract_args(pod, req):
-        req['spec']['containers'][0]['args'] = pod.args
-
-    @staticmethod
-    def attach_ports(pod, req):
-        req['spec']['containers'][0]['ports'] = (
-            req['spec']['containers'][0].get('ports', []))
-        if len(pod.ports) > 0:
-            req['spec']['containers'][0]['ports'].extend(pod.ports)
-
-    @staticmethod
-    def attach_volumes(pod, req):
-        req['spec']['volumes'] = (
-            req['spec'].get('volumes', []))
-        if len(pod.volumes) > 0:
-            req['spec']['volumes'].extend(pod.volumes)
-
-    @staticmethod
-    def attach_volume_mounts(pod, req):
-        if len(pod.volume_mounts) > 0:
-            req['spec']['containers'][0]['volumeMounts'] = (
-                req['spec']['containers'][0].get('volumeMounts', []))
-            req['spec']['containers'][0]['volumeMounts'].extend(pod.volume_mounts)
-
-    @staticmethod
-    def extract_name(pod, req):
-        req['metadata']['name'] = pod.name
-
-    @staticmethod
-    def extract_volume_secrets(pod, req):
-        vol_secrets = [s for s in pod.secrets if s.deploy_type == 'volume']
-        if any(vol_secrets):
-            req['spec']['containers'][0]['volumeMounts'] = (
-                req['spec']['containers'][0].get('volumeMounts', []))
-            req['spec']['volumes'] = (
-                req['spec'].get('volumes', []))
-        for idx, vol in enumerate(vol_secrets):
-            vol_id = 'secretvol' + str(idx)
-            req['spec']['containers'][0]['volumeMounts'].append({
-                'mountPath': vol.deploy_target,
-                'name': vol_id,
-                'readOnly': True
-            })
-            req['spec']['volumes'].append({
-                'name': vol_id,
-                'secret': {
-                    'secretName': vol.secret
-                }
-            })
-
-    @staticmethod
-    def extract_env_and_secrets(pod, req):
-        envs_from_key_secrets = [
-            env for env in pod.secrets if env.deploy_type == 'env' and env.key is not None
-        ]
-
-        if len(pod.envs) > 0 or len(envs_from_key_secrets) > 0 or len(pod.pod_runtime_info_envs) > 0:
-            env = []
-            for runtime_info in pod.pod_runtime_info_envs:
-                KubernetesRequestFactory.add_runtime_info_env(env, runtime_info)
-            for k in pod.envs.keys():
-                env.append({'name': k, 'value': pod.envs[k]})
-            for secret in envs_from_key_secrets:
-                KubernetesRequestFactory.add_secret_to_env(env, secret)
-
-            req['spec']['containers'][0]['env'] = env
-
-        KubernetesRequestFactory._apply_env_from(pod, req)
-
-    @staticmethod
-    def extract_resources(pod, req):
-        if not pod.resources or pod.resources.is_empty_resource_request():
-            return
-
-        req['spec']['containers'][0]['resources'] = {}
-
-        if pod.resources.has_requests():
-            req['spec']['containers'][0]['resources']['requests'] = {}
-            if pod.resources.request_memory:
-                req['spec']['containers'][0]['resources']['requests'][
-                    'memory'] = pod.resources.request_memory
-            if pod.resources.request_cpu:
-                req['spec']['containers'][0]['resources']['requests'][
-                    'cpu'] = pod.resources.request_cpu
-
-        if pod.resources.has_limits():
-            req['spec']['containers'][0]['resources']['limits'] = {}
-            if pod.resources.limit_memory:
-                req['spec']['containers'][0]['resources']['limits'][
-                    'memory'] = pod.resources.limit_memory
-            if pod.resources.limit_cpu:
-                req['spec']['containers'][0]['resources']['limits'][
-                    'cpu'] = pod.resources.limit_cpu
-            if pod.resources.limit_gpu:
-                req['spec']['containers'][0]['resources']['limits'][
-                    'nvidia.com/gpu'] = pod.resources.limit_gpu
-
-    @staticmethod
-    def extract_init_containers(pod, req):
-        if pod.init_containers:
-            req['spec']['initContainers'] = pod.init_containers
-
-    @staticmethod
-    def extract_service_account_name(pod, req):
-        if pod.service_account_name:
-            req['spec']['serviceAccountName'] = pod.service_account_name
-
-    @staticmethod
-    def extract_hostnetwork(pod, req):
-        if pod.hostnetwork:
-            req['spec']['hostNetwork'] = pod.hostnetwork
-
-    @staticmethod
-    def extract_dnspolicy(pod, req):
-        if pod.dnspolicy:
-            req['spec']['dnsPolicy'] = pod.dnspolicy
-
-    @staticmethod
-    def extract_image_pull_secrets(pod, req):
-        if pod.image_pull_secrets:
-            req['spec']['imagePullSecrets'] = [{
-                'name': pull_secret
-            } for pull_secret in pod.image_pull_secrets.split(',')]
-
-    @staticmethod
-    def extract_tolerations(pod, req):
-        if pod.tolerations:
-            req['spec']['tolerations'] = pod.tolerations
-
-    @staticmethod
-    def extract_security_context(pod, req):
-        if pod.security_context:
-            req['spec']['securityContext'] = pod.security_context
-
-    @staticmethod
-    def _apply_env_from(pod, req):
-        envs_from_secrets = [
-            env for env in pod.secrets if env.deploy_type == 'env' and env.key is None
-        ]
-
-        if pod.configmaps or envs_from_secrets:
-            req['spec']['containers'][0]['envFrom'] = []
-
-        for secret in envs_from_secrets:
-            req['spec']['containers'][0]['envFrom'].append(
-                {
-                    'secretRef': {
-                        'name': secret.secret
-                    }
-                }
-            )
-
-        for configmap in pod.configmaps:
-            req['spec']['containers'][0]['envFrom'].append(
-                {
-                    'configMapRef': {
-                        'name': configmap
-                    }
-                }
-            )
diff --git a/airflow/kubernetes/kubernetes_request_factory/pod_request_factory.py b/airflow/kubernetes/kubernetes_request_factory/pod_request_factory.py
deleted file mode 100644
index 3790948..0000000
--- a/airflow/kubernetes/kubernetes_request_factory/pod_request_factory.py
+++ /dev/null
@@ -1,139 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import yaml
-from airflow.kubernetes.pod import Pod
-from airflow.kubernetes.kubernetes_request_factory.kubernetes_request_factory \
-    import KubernetesRequestFactory
-
-
-class SimplePodRequestFactory(KubernetesRequestFactory):
-    """
-    Request generator for a pod.
-
-    """
-    _yaml = """apiVersion: v1
-kind: Pod
-metadata:
-  name: name
-spec:
-  containers:
-    - name: base
-      image: airflow-worker:latest
-      command: ["/usr/local/airflow/entrypoint.sh", "/bin/bash sleep 25"]
-  restartPolicy: Never
-    """
-
-    def __init__(self):
-
-        pass
-
-    def create(self, pod):
-        # type: (Pod) -> dict
-        req = yaml.safe_load(self._yaml)
-        self.extract_name(pod, req)
-        self.extract_labels(pod, req)
-        self.extract_image(pod, req)
-        self.extract_image_pull_policy(pod, req)
-        self.extract_cmds(pod, req)
-        self.extract_args(pod, req)
-        self.extract_node_selector(pod, req)
-        self.extract_env_and_secrets(pod, req)
-        self.extract_volume_secrets(pod, req)
-        self.attach_ports(pod, req)
-        self.attach_volumes(pod, req)
-        self.attach_volume_mounts(pod, req)
-        self.extract_resources(pod, req)
-        self.extract_service_account_name(pod, req)
-        self.extract_init_containers(pod, req)
-        self.extract_image_pull_secrets(pod, req)
-        self.extract_annotations(pod, req)
-        self.extract_affinity(pod, req)
-        self.extract_hostnetwork(pod, req)
-        self.extract_tolerations(pod, req)
-        self.extract_security_context(pod, req)
-        self.extract_dnspolicy(pod, req)
-        return req
-
-
-class ExtractXcomPodRequestFactory(KubernetesRequestFactory):
-    """
-    Request generator for a pod with sidecar container.
-
-    """
-
-    XCOM_MOUNT_PATH = '/airflow/xcom'
-    SIDECAR_CONTAINER_NAME = 'airflow-xcom-sidecar'
-    _yaml = """apiVersion: v1
-kind: Pod
-metadata:
-  name: name
-spec:
-  volumes:
-    - name: xcom
-      emptyDir: {{}}
-  containers:
-    - name: base
-      image: airflow-worker:latest
-      command: ["/usr/local/airflow/entrypoint.sh", "/bin/bash sleep 25"]
-      volumeMounts:
-        - name: xcom
-          mountPath: {xcomMountPath}
-    - name: {sidecarContainerName}
-      image: alpine
-      command:
-        - sh
-        - -c
-        - 'trap "exit 0" INT; while true; do sleep 30; done;'
-      volumeMounts:
-        - name: xcom
-          mountPath: {xcomMountPath}
-      resources:
-        requests:
-          cpu: 1m
-  restartPolicy: Never
-    """.format(xcomMountPath=XCOM_MOUNT_PATH, sidecarContainerName=SIDECAR_CONTAINER_NAME)
-
-    def __init__(self):
-        pass
-
-    def create(self, pod):
-        # type: (Pod) -> dict
-        req = yaml.safe_load(self._yaml)
-        self.extract_name(pod, req)
-        self.extract_labels(pod, req)
-        self.extract_image(pod, req)
-        self.extract_image_pull_policy(pod, req)
-        self.extract_cmds(pod, req)
-        self.extract_args(pod, req)
-        self.extract_node_selector(pod, req)
-        self.extract_env_and_secrets(pod, req)
-        self.extract_volume_secrets(pod, req)
-        self.attach_ports(pod, req)
-        self.attach_volumes(pod, req)
-        self.attach_volume_mounts(pod, req)
-        self.extract_resources(pod, req)
-        self.extract_service_account_name(pod, req)
-        self.extract_init_containers(pod, req)
-        self.extract_image_pull_secrets(pod, req)
-        self.extract_annotations(pod, req)
-        self.extract_affinity(pod, req)
-        self.extract_hostnetwork(pod, req)
-        self.extract_tolerations(pod, req)
-        self.extract_security_context(pod, req)
-        self.extract_dnspolicy(pod, req)
-        return req
diff --git a/airflow/kubernetes/pod.py b/airflow/kubernetes/pod.py
index a9f3dfe..53c9172 100644
--- a/airflow/kubernetes/pod.py
+++ b/airflow/kubernetes/pod.py
@@ -14,9 +14,16 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+"""
+Classes for interacting with Kubernetes API
+"""
 
+import copy
+import kubernetes.client.models as k8s
+from airflow.kubernetes.k8s_model import K8SModel
 
-class Resources:
+
+class Resources(K8SModel):
     __slots__ = ('request_memory', 'request_cpu', 'limit_memory', 'limit_cpu', 'limit_gpu')
 
     def __init__(
@@ -41,13 +48,20 @@ class Resources:
     def has_requests(self):
         return self.request_cpu is not None or self.request_memory is not None
 
-    def __str__(self):
-        return "Request: [cpu: {}, memory: {}], Limit: [cpu: {}, memory: {}, gpu: {}]".format(
-            self.request_cpu, self.request_memory, self.limit_cpu, self.limit_memory, self.limit_gpu
+    def to_k8s_client_obj(self):
+        return k8s.V1ResourceRequirements(
+            limits={'cpu': self.limit_cpu, 'memory': self.limit_memory, 'nvidia.com/gpu': self.limit_gpu},
+            requests={'cpu': self.request_cpu, 'memory': self.request_memory}
         )
 
+    def attach_to_pod(self, pod):
+        cp_pod = copy.deepcopy(pod)
+        resources = self.to_k8s_client_obj()
+        cp_pod.spec.containers[0].resources = resources
+        return cp_pod
+
 
-class Port:
+class Port(K8SModel):
     def __init__(
             self,
             name=None,
@@ -55,96 +69,12 @@ class Port:
         self.name = name
         self.container_port = container_port
 
+    def to_k8s_client_obj(self):
+        return k8s.V1ContainerPort(name=self.name, container_port=self.container_port)
 
-class Pod:
-    """
-    Represents a kubernetes pod and manages execution of a single pod.
-    :param image: The docker image
-    :type image: str
-    :param envs: A dict containing the environment variables
-    :type envs: dict
-    :param cmds: The command to be run on the pod
-    :type cmds: list[str]
-    :param secrets: Secrets to be launched to the pod
-    :type secrets: list[airflow.kubernetes.secret.Secret]
-    :param result: The result that will be returned to the operator after
-        successful execution of the pod
-    :type result: any
-    :param image_pull_policy: Specify a policy to cache or always pull an image
-    :type image_pull_policy: str
-    :param image_pull_secrets: Any image pull secrets to be given to the pod.
-        If more than one secret is required, provide a comma separated list:
-        secret_a,secret_b
-    :type image_pull_secrets: str
-    :param affinity: A dict containing a group of affinity scheduling rules
-    :type affinity: dict
-    :param hostnetwork: If True enable host networking on the pod
-    :type hostnetwork: bool
-    :param tolerations: A list of kubernetes tolerations
-    :type tolerations: list
-    :param security_context: A dict containing the security context for the pod
-    :type security_context: dict
-    :param configmaps: A list containing names of configmaps object
-        mounting env variables to the pod
-    :type configmaps: list[str]
-    :param pod_runtime_info_envs: environment variables about
-                                  pod runtime information (ip, namespace, nodeName, podName)
-    :type pod_runtime_info_envs: list[PodRuntimeEnv]
-    :param dnspolicy: Specify a dnspolicy for the pod
-    :type dnspolicy: str
-    """
-    def __init__(
-            self,
-            image,
-            envs,
-            cmds,
-            args=None,
-            secrets=None,
-            labels=None,
-            node_selectors=None,
-            name=None,
-            ports=None,
-            volumes=None,
-            volume_mounts=None,
-            namespace='default',
-            result=None,
-            image_pull_policy='IfNotPresent',
-            image_pull_secrets=None,
-            init_containers=None,
-            service_account_name=None,
-            resources=None,
-            annotations=None,
-            affinity=None,
-            hostnetwork=False,
-            tolerations=None,
-            security_context=None,
-            configmaps=None,
-            pod_runtime_info_envs=None,
-            dnspolicy=None
-    ):
-        self.image = image
-        self.envs = envs or {}
-        self.cmds = cmds
-        self.args = args or []
-        self.secrets = secrets or []
-        self.result = result
-        self.labels = labels or {}
-        self.name = name
-        self.ports = ports or []
-        self.volumes = volumes or []
-        self.volume_mounts = volume_mounts or []
-        self.node_selectors = node_selectors or {}
-        self.namespace = namespace
-        self.image_pull_policy = image_pull_policy
-        self.image_pull_secrets = image_pull_secrets
-        self.init_containers = init_containers
-        self.service_account_name = service_account_name
-        self.resources = resources or Resources()
-        self.annotations = annotations or {}
-        self.affinity = affinity or {}
-        self.hostnetwork = hostnetwork or False
-        self.tolerations = tolerations or []
-        self.security_context = security_context
-        self.configmaps = configmaps or []
-        self.pod_runtime_info_envs = pod_runtime_info_envs or []
-        self.dnspolicy = dnspolicy
+    def attach_to_pod(self, pod):
+        cp_pod = copy.deepcopy(pod)
+        port = self.to_k8s_client_obj()
+        cp_pod.spec.containers[0].ports = cp_pod.spec.containers[0].ports or []
+        cp_pod.spec.containers[0].ports.append(port)
+        return cp_pod
diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index 56b41d7..249a5ad 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -14,169 +14,335 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
-from airflow.kubernetes.pod import Pod, Port
-from airflow.kubernetes.volume import Volume
-from airflow.kubernetes.volume_mount import VolumeMount
-
+"""
+This module provides an interface between the previous Pod
+API and outputs a kubernetes.client.models.V1Pod.
+The advantage being that the full Kubernetes API
+is supported and no serialization need be written.
+"""
+
+import copy
+import kubernetes.client.models as k8s
+from airflow.executors import Executors
 import uuid
 
 
-class PodGenerator:
-    """Contains Kubernetes Airflow Worker configuration logic"""
-
-    def __init__(self, kube_config=None):
-        self.kube_config = kube_config
-        self.ports = []
-        self.volumes = []
-        self.volume_mounts = []
-        self.init_containers = []
-
-    def add_init_container(self,
-                           name,
-                           image,
-                           security_context,
-                           init_environment,
-                           volume_mounts
-                           ):
-        """
-
-        Adds an init container to the launched pod. useful for pre-
+class PodDefaults:
+    """
+    Static defaults for the PodGenerator
+    """
+    XCOM_MOUNT_PATH = '/airflow/xcom'
+    SIDECAR_CONTAINER_NAME = 'airflow-xcom-sidecar'
+    XCOM_CMD = """import time
+while True:
+    try:
+        time.sleep(3600)
+    except KeyboardInterrupt:
+        exit(0)
+    """
+    VOLUME_MOUNT = k8s.V1VolumeMount(
+        name='xcom',
+        mount_path=XCOM_MOUNT_PATH
+    )
+    VOLUME = k8s.V1Volume(
+        name='xcom',
+        empty_dir=k8s.V1EmptyDirVolumeSource()
+    )
+    SIDECAR_CONTAINER = k8s.V1Container(
+        name=SIDECAR_CONTAINER_NAME,
+        command=['python', '-c', XCOM_CMD],
+        image='python:3.5-alpine',
+        volume_mounts=[VOLUME_MOUNT]
+    )
 
-        Args:
-            name (str):
-            image (str):
-            security_context (dict):
-            init_environment (dict):
-            volume_mounts (dict):
 
-        Returns:
+class PodGenerator:
+    """
+    Contains Kubernetes Airflow Worker configuration logic
+
+    Represents a kubernetes pod and manages execution of a single pod.
+    :param image: The docker image
+    :type image: str
+    :param envs: A dict containing the environment variables
+    :type envs: Dict[str, str]
+    :param cmds: The command to be run on the pod
+    :type cmds: List[str]
+    :param secrets: Secrets to be launched to the pod
+    :type secrets: List[airflow.kubernetes.models.secret.Secret]
+    :param image_pull_policy: Specify a policy to cache or always pull an image
+    :type image_pull_policy: str
+    :param image_pull_secrets: Any image pull secrets to be given to the pod.
+        If more than one secret is required, provide a comma separated list:
+        secret_a,secret_b
+    :type image_pull_secrets: str
+    :param affinity: A dict containing a group of affinity scheduling rules
+    :type affinity: dict
+    :param hostnetwork: If True enable host networking on the pod
+    :type hostnetwork: bool
+    :param tolerations: A list of kubernetes tolerations
+    :type tolerations: list
+    :param security_context: A dict containing the security context for the pod
+    :type security_context: dict
+    :param configmaps: Any configmap refs to envfrom.
+        If more than one configmap is required, provide a comma separated list
+        configmap_a,configmap_b
+    :type configmaps: str
+    :param dnspolicy: Specify a dnspolicy for the pod
+    :type dnspolicy: str
+    :param pod: The fully specified pod.
+    :type pod: kubernetes.client.models.V1Pod
+    """
+
+    def __init__(
+        self,
+        image,
+        name=None,
+        namespace=None,
+        volume_mounts=None,
+        envs=None,
+        cmds=None,
+        args=None,
+        labels=None,
+        node_selectors=None,
+        ports=None,
+        volumes=None,
+        image_pull_policy='IfNotPresent',
+        restart_policy='Never',
+        image_pull_secrets=None,
+        init_containers=None,
+        service_account_name=None,
+        resources=None,
+        annotations=None,
+        affinity=None,
+        hostnetwork=False,
+        tolerations=None,
+        security_context=None,
+        configmaps=None,
+        dnspolicy=None,
+        pod=None,
+        extract_xcom=False,
+    ):
+        self.ud_pod = pod
+        self.pod = k8s.V1Pod()
+        self.pod.api_version = 'v1'
+        self.pod.kind = 'Pod'
+
+        # Pod Metadata
+        self.metadata = k8s.V1ObjectMeta()
+        self.metadata.labels = labels
+        self.metadata.name = name + "-" + str(uuid.uuid4())[:8] if name else None
+        self.metadata.namespace = namespace
+        self.metadata.annotations = annotations
+
+        # Pod Container
+        self.container = k8s.V1Container(name='base')
+        self.container.image = image
+        self.container.env = []
+
+        if envs:
+            if isinstance(envs, dict):
+                for key, val in envs.items():
+                    self.container.env.append(k8s.V1EnvVar(
+                        name=key,
+                        value=val
+                    ))
+            elif isinstance(envs, list):
+                self.container.env.extend(envs)
+
+        configmaps = configmaps or []
+        self.container.env_from = []
+        for configmap in configmaps:
+            self.container.env_from.append(k8s.V1EnvFromSource(
+                config_map_ref=k8s.V1ConfigMapEnvSource(
+                    name=configmap
+                )
+            ))
+
+        self.container.command = cmds or []
+        self.container.args = args or []
+        self.container.image_pull_policy = image_pull_policy
+        self.container.ports = ports or []
+        self.container.resources = resources
+        self.container.volume_mounts = volume_mounts or []
+
+        # Pod Spec
+        self.spec = k8s.V1PodSpec(containers=[])
+        self.spec.security_context = security_context
+        self.spec.tolerations = tolerations
+        self.spec.dns_policy = dnspolicy
+        self.spec.host_network = hostnetwork
+        self.spec.affinity = affinity
+        self.spec.service_account_name = service_account_name
+        self.spec.init_containers = init_containers
+        self.spec.volumes = volumes or []
+        self.spec.node_selector = node_selectors
+        self.spec.restart_policy = restart_policy
+
+        self.spec.image_pull_secrets = []
+
+        if image_pull_secrets:
+            for image_pull_secret in image_pull_secrets.split(','):
+                self.spec.image_pull_secrets.append(k8s.V1LocalObjectReference(
+                    name=image_pull_secret
+                ))
+
+        # Attach sidecar
+        self.extract_xcom = extract_xcom
+
+    def gen_pod(self):
+        result = self.ud_pod
+
+        if result is None:
+            result = self.pod
+            result.spec = self.spec
+            result.metadata = self.metadata
+            result.spec.containers = [self.container]
+
+        if self.extract_xcom:
+            result = self.add_sidecar(result)
+
+        return result
+
+    @staticmethod
+    def add_sidecar(pod):
+        pod_cp = copy.deepcopy(pod)
+
+        pod_cp.spec.volumes.insert(0, PodDefaults.VOLUME)
+        pod_cp.spec.containers[0].volume_mounts.insert(0, PodDefaults.VOLUME_MOUNT)
+        pod_cp.spec.containers.append(PodDefaults.SIDECAR_CONTAINER)
+
+        return pod_cp
+
+    @staticmethod
+    def from_obj(obj):
+        if obj is None:
+            return k8s.V1Pod()
+
+        if isinstance(obj, PodGenerator):
+            return obj.gen_pod()
+
+        if not isinstance(obj, dict):
+            raise TypeError(
+                'Cannot convert a non-dictionary or non-PodGenerator '
+                'object into a KubernetesExecutorConfig')
+
+        namespaced = obj.get(Executors.KubernetesExecutor, {})
+
+        resources = namespaced.get('resources')
+
+        if resources is None:
+            requests = {
+                'cpu': namespaced.get('request_cpu'),
+                'memory': namespaced.get('request_memory')
 
-        """
-        self.init_containers.append(
-            {
-                'name': name,
-                'image': image,
-                'securityContext': security_context,
-                'env': init_environment,
-                'volumeMounts': volume_mounts
             }
-        )
-
-    def _get_init_containers(self):
-        return self.init_containers
-
-    def add_port(self, port):  # type: (Port) -> None
-        """
-        Adds a Port to the generator
-
-        :param port: ports for generated pod
-        :type port: airflow.kubernetes.pod.Port
-        """
-        self.ports.append({'name': port.name, 'containerPort': port.container_port})
-
-    def add_volume(self, volume):  # type: (Volume) -> None
-        """
-        Adds a Volume to the generator
-
-        :param volume: volume for generated pod
-        :type volume: airflow.kubernetes.volume.Volume
-        """
-
-        self._add_volume(name=volume.name, configs=volume.configs)
-
-    def _add_volume(self, name, configs):
-        """
-
-        Args:
-            name (str):
-            configs (dict): Configurations for the volume.
-            Could be used to define PersistentVolumeClaim, ConfigMap, etc...
-
-        Returns:
-
-        """
-        volume_map = {'name': name}
-        for k, v in configs.items():
-            volume_map[k] = v
-
-        self.volumes.append(volume_map)
-
-    def add_volume_with_configmap(self, name, config_map):
-        self.volumes.append(
-            {
-                'name': name,
-                'configMap': config_map
+            limits = {
+                'cpu': namespaced.get('limit_cpu'),
+                'memory': namespaced.get('limit_memory')
             }
+            all_resources = list(requests.values()) + list(limits.values())
+            if all(r is None for r in all_resources):
+                resources = None
+            else:
+                resources = k8s.V1ResourceRequirements(
+                    requests=requests,
+                    limits=limits
+                )
+
+        annotations = namespaced.get('annotations', {})
+        gcp_service_account_key = namespaced.get('gcp_service_account_key', None)
+
+        if annotations is not None and gcp_service_account_key is not None:
+            annotations.update({
+                'iam.cloud.google.com/service-account': gcp_service_account_key
+            })
+
+        pod_spec_generator = PodGenerator(
+            image=namespaced.get('image'),
+            envs=namespaced.get('env'),
+            cmds=namespaced.get('cmds'),
+            args=namespaced.get('args'),
+            labels=namespaced.get('labels'),
+            node_selectors=namespaced.get('node_selectors'),
+            name=namespaced.get('name'),
+            ports=namespaced.get('ports'),
+            volumes=namespaced.get('volumes'),
+            volume_mounts=namespaced.get('volume_mounts'),
+            namespace=namespaced.get('namespace'),
+            image_pull_policy=namespaced.get('image_pull_policy'),
+            restart_policy=namespaced.get('restart_policy'),
+            image_pull_secrets=namespaced.get('image_pull_secrets'),
+            init_containers=namespaced.get('init_containers'),
+            service_account_name=namespaced.get('service_account_name'),
+            resources=resources,
+            annotations=namespaced.get('annotations'),
+            affinity=namespaced.get('affinity'),
+            hostnetwork=namespaced.get('hostnetwork'),
+            tolerations=namespaced.get('tolerations'),
+            security_context=namespaced.get('security_context'),
+            configmaps=namespaced.get('configmaps'),
+            dnspolicy=namespaced.get('dnspolicy'),
+            pod=namespaced.get('pod'),
+            extract_xcom=namespaced.get('extract_xcom'),
         )
 
-    def _add_mount(self,
-                   name,
-                   mount_path,
-                   sub_path,
-                   read_only):
-        """
+        return pod_spec_generator.gen_pod()
 
-        Args:
-            name (str):
-            mount_path (str):
-            sub_path (str):
-            read_only:
-
-        Returns:
-
-        """
-
-        self.volume_mounts.append({
-            'name': name,
-            'mountPath': mount_path,
-            'subPath': sub_path,
-            'readOnly': read_only
-        })
-
-    def add_mount(self,
-                  volume_mount,  # type: VolumeMount
-                  ):
+    @staticmethod
+    def reconcile_pods(base_pod, client_pod):
         """
-        Adds a VolumeMount to the generator
-
-        :param volume_mount: volume for generated pod
-        :type volume_mount: airflow.kubernetes.volume_mount.VolumeMount
+        :param base_pod: has the base attributes which are overwritten if they exist
+            in the client pod and remain if they do not exist in the client_pod
+        :type base_pod: k8s.V1Pod
+        :param client_pod: the pod that the client wants to create.
+        :type client_pod: k8s.V1Pod
+        :return: the merged pods
+
+        This can't be done recursively as certain fields are preserved,
+        some overwritten, and some concatenated, e.g. The command
+        should be preserved from base, the volumes appended to and
+        the other fields overwritten.
         """
-        self._add_mount(
-            name=volume_mount.name,
-            mount_path=volume_mount.mount_path,
-            sub_path=volume_mount.sub_path,
-            read_only=volume_mount.read_only
-        )
 
-    def _get_volumes_and_mounts(self):
-        return self.volumes, self.volume_mounts
-
-    def _get_image_pull_secrets(self):
-        """Extracts any image pull secrets for fetching container(s)"""
-        if not self.kube_config.image_pull_secrets:
-            return []
-        return self.kube_config.image_pull_secrets.split(',')
-
-    def make_pod(self, namespace, image, pod_id, cmds, arguments, labels):
-        volumes, volume_mounts = self._get_volumes_and_mounts()
-        worker_init_container_spec = self._get_init_containers()
-
-        return Pod(
-            namespace=namespace,
-            name=pod_id + "-" + str(uuid.uuid4())[:8],
-            image=image,
-            cmds=cmds,
-            args=arguments,
-            labels=labels,
-            envs={},
-            secrets=[],
-            # service_account_name=self.kube_config.worker_service_account_name,
-            # image_pull_secrets=self.kube_config.image_pull_secrets,
-            init_containers=worker_init_container_spec,
-            ports=self.ports,
-            volumes=volumes,
-            volume_mounts=volume_mounts,
-            resources=None
-        )
+        client_pod_cp = copy.deepcopy(client_pod)
+
+        def merge_objects(base_obj, client_obj):
+            for base_key in base_obj.to_dict().keys():
+                base_val = getattr(base_obj, base_key, None)
+                if not getattr(client_obj, base_key, None) and base_val:
+                    setattr(client_obj, base_key, base_val)
+
+        def extend_object_field(base_obj, client_obj, field_name):
+            base_obj_field = getattr(base_obj, field_name, None)
+            client_obj_field = getattr(client_obj, field_name, None)
+            if not base_obj_field:
+                return
+            if not client_obj_field:
+                setattr(client_obj, field_name, base_obj_field)
+                return
+            appended_fields = base_obj_field + client_obj_field
+            setattr(client_obj, field_name, appended_fields)
+
+        # Values at the pod and metadata should be overwritten where they exist,
+        # but certain values at the spec and container level must be conserved.
+        base_container = base_pod.spec.containers[0]
+        client_container = client_pod_cp.spec.containers[0]
+
+        extend_object_field(base_container, client_container, 'volume_mounts')
+        extend_object_field(base_container, client_container, 'env')
+        extend_object_field(base_container, client_container, 'env_from')
+        extend_object_field(base_container, client_container, 'ports')
+        extend_object_field(base_container, client_container, 'volume_devices')
+        client_container.command = base_container.command
+        client_container.args = base_container.args
+        merge_objects(base_pod.spec.containers[0], client_pod_cp.spec.containers[0])
+        # Just append any additional containers from the base pod
+        client_pod_cp.spec.containers.extend(base_pod.spec.containers[1:])
+
+        merge_objects(base_pod.metadata, client_pod_cp.metadata)
+
+        extend_object_field(base_pod.spec, client_pod_cp.spec, 'volumes')
+        merge_objects(base_pod.spec, client_pod_cp.spec)
+        merge_objects(base_pod, client_pod_cp)
+
+        return client_pod_cp
diff --git a/airflow/kubernetes/pod_launcher.py b/airflow/kubernetes/pod_launcher.py
index 8a9028b..47d8ed5 100644
--- a/airflow/kubernetes/pod_launcher.py
+++ b/airflow/kubernetes/pod_launcher.py
@@ -18,30 +18,22 @@
 import json
 import time
 from datetime import datetime as dt
-from typing import Tuple, Optional
-
-from requests.exceptions import BaseHTTPError
 
 import tenacity
-
 from kubernetes import watch, client
 from kubernetes.client.rest import ApiException
 from kubernetes.stream import stream as kubernetes_stream
+from requests.exceptions import BaseHTTPError
 
+from airflow import AirflowException
+from airflow.kubernetes.pod_generator import PodDefaults
 from airflow.settings import pod_mutation_hook
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.state import State
-from airflow import AirflowException
-
-from airflow.kubernetes.pod import Pod
-from airflow.kubernetes.kubernetes_request_factory import \
-    pod_request_factory as pod_factory
-
 from .kube_client import get_kube_client
 
 
-class PodStatus(object):
-    """Status of the PODs"""
+class PodStatus:
     PENDING = 'pending'
     RUNNING = 'running'
     FAILED = 'failed'
@@ -68,35 +60,36 @@ class PodLauncher(LoggingMixin):
                                                       cluster_context=cluster_context)
         self._watch = watch.Watch()
         self.extract_xcom = extract_xcom
-        self.kube_req_factory = pod_factory.ExtractXcomPodRequestFactory(
-        ) if extract_xcom else pod_factory.SimplePodRequestFactory()
 
     def run_pod_async(self, pod, **kwargs):
         """Runs POD asynchronously"""
         pod_mutation_hook(pod)
 
-        req = self.kube_req_factory.create(pod)
-        self.log.debug('Pod Creation Request: \n%s', json.dumps(req, indent=2))
+        sanitized_pod = self._client.api_client.sanitize_for_serialization(pod)
+        json_pod = json.dumps(sanitized_pod, indent=2)
+
+        self.log.debug('Pod Creation Request: \n%s', json_pod)
         try:
-            resp = self._client.create_namespaced_pod(body=req, namespace=pod.namespace, **kwargs)
+            resp = self._client.create_namespaced_pod(body=sanitized_pod,
+                                                      namespace=pod.metadata.namespace, **kwargs)
             self.log.debug('Pod Creation Response: %s', resp)
-        except ApiException:
-            self.log.exception('Exception when attempting to create Namespaced Pod.')
-            raise
+        except Exception as e:
+            self.log.exception('Exception when attempting '
+                               'to create Namespaced Pod: %s', json_pod)
+            raise e
         return resp
 
     def delete_pod(self, pod):
         """Deletes POD"""
         try:
             self._client.delete_namespaced_pod(
-                pod.name, pod.namespace, body=client.V1DeleteOptions())
+                pod.metadata.name, pod.metadata.namespace, body=client.V1DeleteOptions())
         except ApiException as e:
             # If the pod is already deleted
             if e.status != 404:
                 raise
 
     def run_pod(self, pod, startup_timeout=120, get_logs=True):
-        # type: (Pod, int, bool) -> Tuple[State, Optional[str]]
         """
         Launches the pod synchronously and waits for completion.
         Args:
@@ -117,7 +110,6 @@ class PodLauncher(LoggingMixin):
         return self._monitor_pod(pod, get_logs)
 
     def _monitor_pod(self, pod, get_logs):
-        # type: (Pod, bool) -> Tuple[State, Optional[str]]
 
         if get_logs:
             logs = self.read_pod_logs(pod)
@@ -126,13 +118,13 @@ class PodLauncher(LoggingMixin):
         result = None
         if self.extract_xcom:
             while self.base_container_is_running(pod):
-                self.log.info('Container %s has state %s', pod.name, State.RUNNING)
+                self.log.info('Container %s has state %s', pod.metadata.name, State.RUNNING)
                 time.sleep(2)
             result = self._extract_xcom(pod)
             self.log.info(result)
             result = json.loads(result)
         while self.pod_is_running(pod):
-            self.log.info('Pod %s has state %s', pod.name, State.RUNNING)
+            self.log.info('Pod %s has state %s', pod.metadata.name, State.RUNNING)
             time.sleep(2)
         return self._task_status(self.read_pod(pod)), result
 
@@ -158,6 +150,8 @@ class PodLauncher(LoggingMixin):
         event = self.read_pod(pod)
         status = next(iter(filter(lambda s: s.name == 'base',
                                   event.status.container_statuses)), None)
+        if not status:
+            return False
         return status.state.running is not None
 
     @tenacity.retry(
@@ -169,8 +163,8 @@ class PodLauncher(LoggingMixin):
         """Reads log from the POD"""
         try:
             return self._client.read_namespaced_pod_log(
-                name=pod.name,
-                namespace=pod.namespace,
+                name=pod.metadata.name,
+                namespace=pod.metadata.namespace,
                 container='base',
                 follow=True,
                 tail_lines=10,
@@ -189,7 +183,7 @@ class PodLauncher(LoggingMixin):
     def read_pod(self, pod):
         """Read POD information"""
         try:
-            return self._client.read_namespaced_pod(pod.name, pod.namespace)
+            return self._client.read_namespaced_pod(pod.metadata.name, pod.metadata.namespace)
         except BaseHTTPError as e:
             raise AirflowException(
                 'There was an error reading the kubernetes API: {}'.format(e)
@@ -197,19 +191,19 @@ class PodLauncher(LoggingMixin):
 
     def _extract_xcom(self, pod):
         resp = kubernetes_stream(self._client.connect_get_namespaced_pod_exec,
-                                 pod.name, pod.namespace,
-                                 container=self.kube_req_factory.SIDECAR_CONTAINER_NAME,
+                                 pod.metadata.name, pod.metadata.namespace,
+                                 container=PodDefaults.SIDECAR_CONTAINER_NAME,
                                  command=['/bin/sh'], stdin=True, stdout=True,
                                  stderr=True, tty=False,
                                  _preload_content=False)
         try:
             result = self._exec_pod_command(
-                resp, 'cat {}/return.json'.format(self.kube_req_factory.XCOM_MOUNT_PATH))
+                resp, 'cat {}/return.json'.format(PodDefaults.XCOM_MOUNT_PATH))
             self._exec_pod_command(resp, 'kill -s SIGINT 1')
         finally:
             resp.close()
         if result is None:
-            raise AirflowException('Failed to extract xcom from pod: {}'.format(pod.name))
+            raise AirflowException('Failed to extract xcom from pod: {}'.format(pod.metadata.name))
         return result
 
     def _exec_pod_command(self, resp, command):
diff --git a/airflow/kubernetes/pod_runtime_info_env.py b/airflow/kubernetes/pod_runtime_info_env.py
index f52791e..7d23a7e 100644
--- a/airflow/kubernetes/pod_runtime_info_env.py
+++ b/airflow/kubernetes/pod_runtime_info_env.py
@@ -15,11 +15,15 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-Classes for using Kubernetes Downward API
+Classes for interacting with Kubernetes API
 """
 
+import copy
+import kubernetes.client.models as k8s
+from airflow.kubernetes.k8s_model import K8SModel
 
-class PodRuntimeInfoEnv:
+
+class PodRuntimeInfoEnv(K8SModel):
     """Defines Pod runtime information as environment variable"""
 
     def __init__(self, name, field_path):
@@ -34,3 +38,23 @@ class PodRuntimeInfoEnv:
         """
         self.name = name
         self.field_path = field_path
+
+    def to_k8s_client_obj(self):
+        """
+        :return: kubernetes.client.models.V1EnvVar
+        """
+        return k8s.V1EnvVar(
+            name=self.name,
+            value_from=k8s.V1EnvVarSource(
+                field_ref=k8s.V1ObjectFieldSelector(
+                    self.field_path
+                )
+            )
+        )
+
+    def attach_to_pod(self, pod):
+        cp_pod = copy.deepcopy(pod)
+        env = self.to_k8s_client_obj()
+        cp_pod.spec.containers[0].env = cp_pod.spec.containers[0].env or []
+        cp_pod.spec.containers[0].env.append(env)
+        return cp_pod
diff --git a/airflow/kubernetes/secret.py b/airflow/kubernetes/secret.py
index fde1ded..8591a88 100644
--- a/airflow/kubernetes/secret.py
+++ b/airflow/kubernetes/secret.py
@@ -14,10 +14,18 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+"""
+Classes for interacting with Kubernetes API
+"""
+
+import uuid
+import copy
+import kubernetes.client.models as k8s
 from airflow.exceptions import AirflowConfigException
+from airflow.kubernetes.k8s_model import K8SModel
 
 
-class Secret(object):
+class Secret(K8SModel):
     """Defines Kubernetes Secret Volume"""
 
     def __init__(self, deploy_type, deploy_target, secret, key=None):
@@ -38,6 +46,9 @@ class Secret(object):
             if not provided in `deploy_type` `env` it will mount all secrets in object
         :type key: str or None
         """
+        if deploy_type not in ('env', 'volume'):
+            raise AirflowConfigException("deploy_type must be env or volume")
+
         self.deploy_type = deploy_type
         self.deploy_target = deploy_target
 
@@ -53,6 +64,56 @@ class Secret(object):
         self.secret = secret
         self.key = key
 
+    def to_env_secret(self):
+        return k8s.V1EnvVar(
+            name=self.deploy_target,
+            value_from=k8s.V1EnvVarSource(
+                secret_key_ref=k8s.V1SecretKeySelector(
+                    name=self.secret,
+                    key=self.key
+                )
+            )
+        )
+
+    def to_env_from_secret(self):
+        return k8s.V1EnvFromSource(
+            secret_ref=k8s.V1SecretEnvSource(name=self.secret)
+        )
+
+    def to_volume_secret(self):
+        vol_id = 'secretvol{}'.format(uuid.uuid4())
+        return (
+            k8s.V1Volume(
+                name=vol_id,
+                secret=k8s.V1SecretVolumeSource(
+                    secret_name=self.secret
+                )
+            ),
+            k8s.V1VolumeMount(
+                mount_path=self.deploy_target,
+                name=vol_id,
+                read_only=True
+            )
+        )
+
+    def attach_to_pod(self, pod):
+        cp_pod = copy.deepcopy(pod)
+        if self.deploy_type == 'volume':
+            volume, volume_mount = self.to_volume_secret()
+            cp_pod.spec.volumes = pod.spec.volumes or []
+            cp_pod.spec.volumes.append(volume)
+            cp_pod.spec.containers[0].volume_mounts = pod.spec.containers[0].volume_mounts or []
+            cp_pod.spec.containers[0].volume_mounts.append(volume_mount)
+        if self.deploy_type == 'env' and self.key is not None:
+            env = self.to_env_secret()
+            cp_pod.spec.containers[0].env = cp_pod.spec.containers[0].env or []
+            cp_pod.spec.containers[0].env.append(env)
+        if self.deploy_type == 'env' and self.key is None:
+            env_from = self.to_env_from_secret()
+            cp_pod.spec.containers[0].env_from = cp_pod.spec.containers[0].env_from or []
+            cp_pod.spec.containers[0].env_from.append(env_from)
+        return cp_pod
+
     def __eq__(self, other):
         return (
             self.deploy_type == other.deploy_type and
diff --git a/airflow/kubernetes/volume.py b/airflow/kubernetes/volume.py
index 94003fe..9d85959 100644
--- a/airflow/kubernetes/volume.py
+++ b/airflow/kubernetes/volume.py
@@ -14,11 +14,15 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+"""
+Classes for interacting with Kubernetes API
+"""
 
+import copy
+from airflow.kubernetes.k8s_model import K8SModel
 
-class Volume:
-    """Defines Kubernetes Volume"""
 
+class Volume(K8SModel):
     def __init__(self, name, configs):
         """ Adds Kubernetes Volume to pod. allows pod to access features like ConfigMaps
         and Persistent Volumes
@@ -31,3 +35,15 @@ class Volume:
         """
         self.name = name
         self.configs = configs
+
+    def to_k8s_client_obj(self):
+        configs = self.configs
+        configs['name'] = self.name
+        return configs
+
+    def attach_to_pod(self, pod):
+        cp_pod = copy.deepcopy(pod)
+        volume = self.to_k8s_client_obj()
+        cp_pod.spec.volumes = pod.spec.volumes or []
+        cp_pod.spec.volumes.append(volume)
+        return cp_pod
diff --git a/airflow/kubernetes/volume_mount.py b/airflow/kubernetes/volume_mount.py
index 4bdf09c..74563b4 100644
--- a/airflow/kubernetes/volume_mount.py
+++ b/airflow/kubernetes/volume_mount.py
@@ -14,9 +14,16 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+"""
+Classes for interacting with Kubernetes API
+"""
 
+import copy
+import kubernetes.client.models as k8s
+from airflow.kubernetes.k8s_model import K8SModel
 
-class VolumeMount:
+
+class VolumeMount(K8SModel):
     """Defines Kubernetes Volume Mount"""
 
     def __init__(self, name, mount_path, sub_path, read_only):
@@ -35,3 +42,18 @@ class VolumeMount:
         self.mount_path = mount_path
         self.sub_path = sub_path
         self.read_only = read_only
+
+    def to_k8s_client_obj(self):
+        return k8s.V1VolumeMount(
+            name=self.name,
+            mount_path=self.mount_path,
+            sub_path=self.sub_path,
+            read_only=self.read_only
+        )
+
+    def attach_to_pod(self, pod):
+        cp_pod = copy.deepcopy(pod)
+        volume_mount = self.to_k8s_client_obj()
+        cp_pod.spec.containers[0].volume_mounts = pod.spec.containers[0].volume_mounts or []
+        cp_pod.spec.containers[0].volume_mounts.append(volume_mount)
+        return cp_pod
diff --git a/airflow/kubernetes/worker_configuration.py b/airflow/kubernetes/worker_configuration.py
index 74e464a..41a65b3 100644
--- a/airflow/kubernetes/worker_configuration.py
+++ b/airflow/kubernetes/worker_configuration.py
@@ -16,10 +16,13 @@
 # under the License.
 
 import os
+
+import kubernetes.client.models as k8s
 import six
 
 from airflow.configuration import conf
-from airflow.kubernetes.pod import Pod, Resources
+from airflow.kubernetes.k8s_model import append_to_pod
+from airflow.kubernetes.pod_generator import PodGenerator
 from airflow.kubernetes.secret import Secret
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.version import version as airflow_version
@@ -51,123 +54,120 @@ class WorkerConfiguration(LoggingMixin):
             return []
 
         # Otherwise, define a git-sync init container
-        init_environment = [{
-            'name': 'GIT_SYNC_REPO',
-            'value': self.kube_config.git_repo
-        }, {
-            'name': 'GIT_SYNC_BRANCH',
-            'value': self.kube_config.git_branch
-        }, {
-            'name': 'GIT_SYNC_ROOT',
-            'value': self.kube_config.git_sync_root
-        }, {
-            'name': 'GIT_SYNC_DEST',
-            'value': self.kube_config.git_sync_dest
-        }, {
-            'name': 'GIT_SYNC_DEPTH',
-            'value': '1'
-        }, {
-            'name': 'GIT_SYNC_ONE_TIME',
-            'value': 'true'
-        }, {
-            'name': 'GIT_SYNC_REV',
-            'value': self.kube_config.git_sync_rev
-        }]
-
+        init_environment = [k8s.V1EnvVar(
+            name='GIT_SYNC_REPO',
+            value=self.kube_config.git_repo
+        ), k8s.V1EnvVar(
+            name='GIT_SYNC_BRANCH',
+            value=self.kube_config.git_branch
+        ), k8s.V1EnvVar(
+            name='GIT_SYNC_ROOT',
+            value=self.kube_config.git_sync_root
+        ), k8s.V1EnvVar(
+            name='GIT_SYNC_DEST',
+            value=self.kube_config.git_sync_dest
+        ), k8s.V1EnvVar(
+            name='GIT_SYNC_DEPTH',
+            value='1'
+        ), k8s.V1EnvVar(
+            name='GIT_SYNC_ONE_TIME',
+            value='true'
+        ), k8s.V1EnvVar(
+            name='GIT_SYNC_REV',
+            value=self.kube_config.git_sync_rev
+        )]
         for env_var_name, env_var_val in six.iteritems(self.kube_config.kube_env_vars):
-            init_environment.append({
-                'name': env_var_name,
-                'value': env_var_val
-            })
-
+            init_environment.append(k8s.V1EnvVar(
+                name=env_var_name,
+                value=env_var_val
+            ))
         if self.kube_config.git_user:
-            init_environment.append({
-                'name': 'GIT_SYNC_USERNAME',
-                'value': self.kube_config.git_user
-            })
+            init_environment.append(k8s.V1EnvVar(
+                name='GIT_SYNC_USERNAME',
+                value=self.kube_config.git_user
+            ))
         if self.kube_config.git_password:
-            init_environment.append({
-                'name': 'GIT_SYNC_PASSWORD',
-                'value': self.kube_config.git_password
-            })
+            init_environment.append(k8s.V1EnvVar(
+                name='GIT_SYNC_PASSWORD',
+                value=self.kube_config.git_password
+            ))
+
+        volume_mounts = [k8s.V1VolumeMount(
+            mount_path=self.kube_config.git_sync_root,
+            name=self.dags_volume_name,
+            read_only=False
+        )]
 
         if self.kube_config.git_sync_credentials_secret:
             init_environment.extend([
-                {
-                    'name': 'GIT_SYNC_USERNAME',
-                    'valueFrom': {
-                        'secretKeyRef': {
-                            'name': self.kube_config.git_sync_credentials_secret,
-                            'key': 'GIT_SYNC_USERNAME'
-                        }
-                    }
-                },
-                {
-                    'name': 'GIT_SYNC_PASSWORD',
-                    'valueFrom': {
-                        'secretKeyRef': {
-                            'name': self.kube_config.git_sync_credentials_secret,
-                            'key': 'GIT_SYNC_PASSWORD'
-                        }
-                    }
-                }
+                k8s.V1EnvVar(
+                    name='GIT_SYNC_USERNAME',
+                    value_from=k8s.V1EnvVarSource(
+                        secret_key_ref=k8s.V1SecretKeySelector(
+                            name=self.kube_config.git_sync_credentials_secret,
+                            key='GIT_SYNC_USERNAME')
+                    )
+                ),
+                k8s.V1EnvVar(
+                    name='GIT_SYNC_PASSWORD',
+                    value_from=k8s.V1EnvVarSource(
+                        secret_key_ref=k8s.V1SecretKeySelector(
+                            name=self.kube_config.git_sync_credentials_secret,
+                            key='GIT_SYNC_PASSWORD')
+                    )
+                )
             ])
 
-        volume_mounts = [{
-            'mountPath': self.kube_config.git_sync_root,
-            'name': self.dags_volume_name,
-            'readOnly': False
-        }]
         if self.kube_config.git_ssh_key_secret_name:
-            volume_mounts.append({
-                'name': self.git_sync_ssh_secret_volume_name,
-                'mountPath': '/etc/git-secret/ssh',
-                'subPath': 'ssh'
-            })
-            init_environment.extend([
-                {
-                    'name': 'GIT_SSH_KEY_FILE',
-                    'value': '/etc/git-secret/ssh'
-                },
-                {
-                    'name': 'GIT_SYNC_SSH',
-                    'value': 'true'
-                }])
-        if self.kube_config.git_ssh_known_hosts_configmap_name:
-            volume_mounts.append({
-                'name': self.git_sync_ssh_known_hosts_volume_name,
-                'mountPath': '/etc/git-secret/known_hosts',
-                'subPath': 'known_hosts'
-            })
+            volume_mounts.append(k8s.V1VolumeMount(
+                name=self.git_sync_ssh_secret_volume_name,
+                mount_path='/etc/git-secret/ssh',
+                sub_path='ssh'
+            ))
+
             init_environment.extend([
-                {
-                    'name': 'GIT_KNOWN_HOSTS',
-                    'value': 'true'
-                },
-                {
-                    'name': 'GIT_SSH_KNOWN_HOSTS_FILE',
-                    'value': '/etc/git-secret/known_hosts'
-                }
+                k8s.V1EnvVar(
+                    name='GIT_SSH_KEY_FILE',
+                    value='/etc/git-secret/ssh'
+                ),
+                k8s.V1EnvVar(
+                    name='GIT_SYNC_SSH',
+                    value='true'
+                )
             ])
+
+        if self.kube_config.git_ssh_known_hosts_configmap_name:
+            volume_mounts.append(k8s.V1VolumeMount(
+                name=self.git_sync_ssh_known_hosts_volume_name,
+                mount_path='/etc/git-secret/known_hosts',
+                sub_path='known_hosts'
+            ))
+            init_environment.extend([k8s.V1EnvVar(
+                name='GIT_KNOWN_HOSTS',
+                value='true'
+            ), k8s.V1EnvVar(
+                name='GIT_SSH_KNOWN_HOSTS_FILE',
+                value='/etc/git-secret/known_hosts'
+            )])
         else:
-            init_environment.append({
-                'name': 'GIT_KNOWN_HOSTS',
-                'value': 'false'
-            })
-
-        init_containers = [{
-            'name': self.kube_config.git_sync_init_container_name,
-            'image': self.kube_config.git_sync_container,
-            'env': init_environment,
-            'volumeMounts': volume_mounts
-        }]
+            init_environment.append(k8s.V1EnvVar(
+                name='GIT_KNOWN_HOSTS',
+                value='false'
+            ))
+
+        init_containers = k8s.V1Container(
+            name=self.kube_config.git_sync_init_container_name,
+            image=self.kube_config.git_sync_container,
+            env=init_environment,
+            volume_mounts=volume_mounts
+        )
 
         if self.kube_config.git_sync_run_as_user != "":
-            init_containers[0]['securityContext'] = {
-                'runAsUser': self.kube_config.git_sync_run_as_user  # git-sync user
-            }
+            init_containers.security_context = k8s.V1SecurityContext(
+                run_as_user=self.kube_config.git_sync_run_as_user or 65533
+            )  # git-sync user
 
-        return init_containers
+        return [init_containers]
 
     def _get_environment(self):
         """Defines any necessary environment variables for the pod executor"""
@@ -196,9 +196,39 @@ class WorkerConfiguration(LoggingMixin):
 
     def _get_configmaps(self):
         """Extracts any configmapRefs to envFrom"""
-        if not self.kube_config.env_from_configmap_ref:
-            return []
-        return self.kube_config.env_from_configmap_ref.split(',')
+        env_from = []
+
+        if self.kube_config.env_from_configmap_ref:
+            for config_map_ref in self.kube_config.env_from_configmap_ref.split(','):
+                env_from.append(
+                    k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(config_map_ref))
+                )
+
+        if self.kube_config.env_from_secret_ref:
+            for secret_ref in self.kube_config.env_from_secret_ref.split(','):
+                env_from.append(
+                    k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(secret_ref))
+                )
+
+        return env_from
+
+    def _get_env_from(self):
+        """Extracts any configmapRefs to envFrom"""
+        env_from = []
+
+        if self.kube_config.env_from_configmap_ref:
+            for config_map_ref in self.kube_config.env_from_configmap_ref.split(','):
+                env_from.append(
+                    k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(config_map_ref))
+                )
+
+        if self.kube_config.env_from_secret_ref:
+            for secret_ref in self.kube_config.env_from_secret_ref.split(','):
+                env_from.append(
+                    k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(secret_ref))
+                )
+
+        return env_from
 
     def _get_secrets(self):
         """Defines any necessary secrets for the pod executor"""
@@ -222,21 +252,23 @@ class WorkerConfiguration(LoggingMixin):
         """Extracts any image pull secrets for fetching container(s)"""
         if not self.kube_config.image_pull_secrets:
             return []
-        return self.kube_config.image_pull_secrets.split(',')
+        pull_secrets = self.kube_config.image_pull_secrets.split(',')
+        return list(map(lambda name: k8s.V1LocalObjectReference(name), pull_secrets))
 
     def _get_security_context(self):
         """Defines the security context"""
-        security_context = {}
+
+        security_context = k8s.V1PodSecurityContext()
 
         if self.kube_config.worker_run_as_user != "":
-            security_context['runAsUser'] = self.kube_config.worker_run_as_user
+            security_context.run_as_user = self.kube_config.worker_run_as_user
 
         if self.kube_config.worker_fs_group != "":
-            security_context['fsGroup'] = self.kube_config.worker_fs_group
+            security_context.fs_group = self.kube_config.worker_fs_group
 
         # set fs_group to 65533 if not explicitly specified and using git ssh keypair auth
-        if self.kube_config.git_ssh_key_secret_name and security_context.get('fsGroup') is None:
-            security_context['fsGroup'] = 65533
+        if self.kube_config.git_ssh_key_secret_name and security_context.fs_group is None:
+            security_context.fs_group = 65533
 
         return security_context
 
@@ -246,22 +278,77 @@ class WorkerConfiguration(LoggingMixin):
         copy.update(labels)
         return copy
 
-    def _get_volumes_and_mounts(self):
+    def _get_volume_mounts(self):
+        volume_mounts = {
+            self.dags_volume_name: k8s.V1VolumeMount(
+                name=self.dags_volume_name,
+                mount_path=self.generate_dag_volume_mount_path(),
+                read_only=True,
+            ),
+            self.logs_volume_name: k8s.V1VolumeMount(
+                name=self.logs_volume_name,
+                mount_path=self.worker_airflow_logs,
+            )
+        }
+
+        if self.kube_config.dags_volume_subpath:
+            volume_mounts[self.dags_volume_name].sub_path = self.kube_config.dags_volume_subpath
+
+        if self.kube_config.logs_volume_subpath:
+            volume_mounts[self.logs_volume_name].sub_path = self.kube_config.logs_volume_subpath
+
+        if self.kube_config.dags_in_image:
+            del volume_mounts[self.dags_volume_name]
+
+        # Mount the airflow.cfg file via a configmap the user has specified
+        if self.kube_config.airflow_configmap:
+            config_volume_name = 'airflow-config'
+            config_path = '{}/airflow.cfg'.format(self.worker_airflow_home)
+            volume_mounts[config_volume_name] = k8s.V1VolumeMount(
+                name=config_volume_name,
+                mount_path=config_path,
+                sub_path='airflow.cfg',
+                read_only=True
+            )
+        if self.kube_config.airflow_local_settings_configmap:
+            config_path = '{}/config/airflow_local_settings.py'.format(self.worker_airflow_home)
+
+            if self.kube_config.airflow_local_settings_configmap != self.kube_config.airflow_configmap:
+                config_volume_name = 'airflow-local-settings'
+
+                volume_mounts[config_volume_name] = k8s.V1VolumeMount(
+                    name=config_volume_name,
+                    mount_path=config_path,
+                    sub_path='airflow_local_settings.cfg',
+                    read_only=True
+                )
+
+            else:
+                volume_mounts['airflow-local-settings'] = k8s.V1VolumeMount(
+                    name='airflow-config',
+                    mount_path=config_path,
+                    sub_path='airflow_local_settings.cfg',
+                    read_only=True
+                )
+
+        return list(volume_mounts.values())
+
+    def _get_volumes(self):
         def _construct_volume(name, claim, host):
-            volume = {
-                'name': name
-            }
+            volume = k8s.V1Volume(name=name)
+
             if claim:
-                volume['persistentVolumeClaim'] = {
-                    'claimName': claim
-                }
+                volume.persistent_volume_claim = k8s.V1PersistentVolumeClaimVolumeSource(
+                    claim_name=claim
+                )
             elif host:
-                volume['hostPath'] = {
-                    'path': host,
-                    'type': ''
-                }
+                volume.host_path = k8s.V1HostPathVolumeSource(
+                    path=host,
+                    type=''
+                )
             else:
-                volume['emptyDir'] = {}
+                volume.empty_dir = {}
+
             return volume
 
         volumes = {
@@ -277,135 +364,68 @@ class WorkerConfiguration(LoggingMixin):
             )
         }
 
-        volume_mounts = {
-            self.dags_volume_name: {
-                'name': self.dags_volume_name,
-                'mountPath': self.generate_dag_volume_mount_path(),
-                'readOnly': True,
-            },
-            self.logs_volume_name: {
-                'name': self.logs_volume_name,
-                'mountPath': self.worker_airflow_logs,
-            }
-        }
-
-        if self.kube_config.dags_volume_subpath:
-            volume_mounts[self.dags_volume_name]['subPath'] = self.kube_config.dags_volume_subpath
-
-        if self.kube_config.logs_volume_subpath:
-            volume_mounts[self.logs_volume_name]['subPath'] = self.kube_config.logs_volume_subpath
-
         if self.kube_config.dags_in_image:
             del volumes[self.dags_volume_name]
-            del volume_mounts[self.dags_volume_name]
 
         # Get the SSH key from secrets as a volume
         if self.kube_config.git_ssh_key_secret_name:
-            volumes[self.git_sync_ssh_secret_volume_name] = {
-                'name': self.git_sync_ssh_secret_volume_name,
-                'secret': {
-                    'secretName': self.kube_config.git_ssh_key_secret_name,
-                    'items': [{
-                        'key': self.git_ssh_key_secret_key,
-                        'path': 'ssh',
-                        'mode': 0o440
-                    }]
-                }
-            }
+            volumes[self.git_sync_ssh_secret_volume_name] = k8s.V1Volume(
+                name=self.git_sync_ssh_secret_volume_name,
+                secret=k8s.V1SecretVolumeSource(
+                    secret_name=self.kube_config.git_ssh_key_secret_name,
+                    items=[k8s.V1KeyToPath(
+                        key=self.git_ssh_key_secret_key,
+                        path='ssh',
+                        mode=0o440
+                    )]
+                )
+            )
 
         if self.kube_config.git_ssh_known_hosts_configmap_name:
-            volumes[self.git_sync_ssh_known_hosts_volume_name] = {
-                'name': self.git_sync_ssh_known_hosts_volume_name,
-                'configMap': {
-                    'name': self.kube_config.git_ssh_known_hosts_configmap_name
-                },
-                'mode': 0o440
-            }
+            volumes[self.git_sync_ssh_known_hosts_volume_name] = k8s.V1Volume(
+                name=self.git_sync_ssh_known_hosts_volume_name,
+                config_map=k8s.V1ConfigMapVolumeSource(
+                    name=self.kube_config.git_ssh_known_hosts_configmap_name,
+                    default_mode=0o440
+                )
+            )
 
-        if self.kube_config.airflow_local_settings_configmap:
-            config_path = '{}/config/airflow_local_settings.py'.format(self.worker_airflow_home)
+        # Mount the airflow.cfg file via a configmap the user has specified
+        if self.kube_config.airflow_configmap:
+            config_volume_name = 'airflow-config'
+            volumes[config_volume_name] = k8s.V1Volume(
+                name=config_volume_name,
+                config_map=k8s.V1ConfigMapVolumeSource(
+                    name=self.kube_config.airflow_configmap
+                )
+            )
 
+        if self.kube_config.airflow_local_settings_configmap:
             if self.kube_config.airflow_local_settings_configmap != self.kube_config.airflow_configmap:
                 config_volume_name = 'airflow-local-settings'
-                volumes[config_volume_name] = {
-                    'name': config_volume_name,
-                    'configMap': {
-                        'name': self.kube_config.airflow_local_settings_configmap
-                    }
-                }
-
-                volume_mounts[config_volume_name] = {
-                    'name': config_volume_name,
-                    'mountPath': config_path,
-                    'subPath': 'airflow_local_settings.py',
-                    'readOnly': True
-                }
+                volumes[config_volume_name] = k8s.V1Volume(
+                    name=config_volume_name,
+                    config_map=k8s.V1ConfigMapVolumeSource(
+                        name=self.kube_config.airflow_local_settings_configmap
+                    )
+                )
 
-            else:
-                volume_mounts['airflow-local-settings'] = {
-                    'name': 'airflow-config',
-                    'mountPath': config_path,
-                    'subPath': 'airflow_local_settings.py',
-                    'readOnly': True
-                }
-
-        # Mount the airflow.cfg file via a configmap the user has specified
-        if self.kube_config.airflow_configmap:
-            config_volume_name = 'airflow-config'
-            config_path = '{}/airflow.cfg'.format(self.worker_airflow_home)
-            volumes[config_volume_name] = {
-                'name': config_volume_name,
-                'configMap': {
-                    'name': self.kube_config.airflow_configmap
-                }
-            }
-            volume_mounts[config_volume_name] = {
-                'name': config_volume_name,
-                'mountPath': config_path,
-                'subPath': 'airflow.cfg',
-                'readOnly': True
-            }
-
-        return volumes, volume_mounts
+        return list(volumes.values())
 
     def generate_dag_volume_mount_path(self):
         if self.kube_config.dags_volume_claim or self.kube_config.dags_volume_host:
-            dag_volume_mount_path = self.worker_airflow_dags
-        else:
-            dag_volume_mount_path = self.kube_config.git_dags_folder_mount_point
+            return self.worker_airflow_dags
 
-        return dag_volume_mount_path
+        return self.kube_config.git_dags_folder_mount_point
 
     def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_date,
-                 try_number, airflow_command, kube_executor_config):
-        volumes_dict, volume_mounts_dict = self._get_volumes_and_mounts()
-        worker_init_container_spec = self._get_init_containers()
-        resources = Resources(
-            request_memory=kube_executor_config.request_memory,
-            request_cpu=kube_executor_config.request_cpu,
-            limit_memory=kube_executor_config.limit_memory,
-            limit_cpu=kube_executor_config.limit_cpu,
-            limit_gpu=kube_executor_config.limit_gpu
-        )
-        gcp_sa_key = kube_executor_config.gcp_service_account_key
-        annotations = dict(kube_executor_config.annotations) or self.kube_config.kube_annotations
-        if gcp_sa_key:
-            annotations['iam.cloud.google.com/service-account'] = gcp_sa_key
-
-        volumes = [value for value in volumes_dict.values()] + kube_executor_config.volumes
-        volume_mounts = [value for value in volume_mounts_dict.values()] + kube_executor_config.volume_mounts
-
-        affinity = kube_executor_config.affinity or self.kube_config.kube_affinity
-        tolerations = kube_executor_config.tolerations or self.kube_config.kube_tolerations
-
-        return Pod(
+                 try_number, airflow_command):
+        pod_generator = PodGenerator(
             namespace=namespace,
             name=pod_id,
-            image=kube_executor_config.image or self.kube_config.kube_image,
-            image_pull_policy=(kube_executor_config.image_pull_policy or
-                               self.kube_config.kube_image_pull_policy),
-            cmds=airflow_command,
-            labels=self._get_labels(kube_executor_config.labels, {
+            image=self.kube_config.kube_image,
+            image_pull_policy=self.kube_config.kube_image_pull_policy,
+            labels={
                 'airflow-worker': worker_uuid,
                 'dag_id': dag_id,
                 'task_id': task_id,
@@ -413,20 +433,22 @@ class WorkerConfiguration(LoggingMixin):
                 'try_number': str(try_number),
                 'airflow_version': airflow_version.replace('+', '-'),
                 'kubernetes_executor': 'True',
-            }),
+            },
+            cmds=airflow_command,
+            volumes=self._get_volumes(),
+            volume_mounts=self._get_volume_mounts(),
+            init_containers=self._get_init_containers(),
+            annotations=self.kube_config.kube_annotations,
+            affinity=self.kube_config.kube_affinity,
+            tolerations=self.kube_config.kube_tolerations,
             envs=self._get_environment(),
-            secrets=self._get_secrets(),
+            node_selectors=self.kube_config.kube_node_selectors,
             service_account_name=self.kube_config.worker_service_account_name,
-            image_pull_secrets=self.kube_config.image_pull_secrets,
-            init_containers=worker_init_container_spec,
-            volumes=volumes,
-            volume_mounts=volume_mounts,
-            resources=resources,
-            annotations=annotations,
-            node_selectors=(kube_executor_config.node_selectors or
-                            self.kube_config.kube_node_selectors),
-            affinity=affinity,
-            tolerations=tolerations,
-            security_context=self._get_security_context(),
-            configmaps=self._get_configmaps()
         )
+
+        pod = pod_generator.gen_pod()
+        pod.spec.containers[0].env_from = pod.spec.containers[0].env_from or []
+        pod.spec.containers[0].env_from.extend(self._get_env_from())
+        pod.spec.security_context = self._get_security_context()
+
+        return append_to_pod(pod, self._get_secrets())
diff --git a/airflow/settings.py b/airflow/settings.py
index 6f82e5d..bf2fb1d 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -182,8 +182,9 @@ def policy(task_instance):
 
 def pod_mutation_hook(pod):
     """
-    This setting allows altering ``Pod`` objects before they are passed to
-    the Kubernetes client by the ``PodLauncher`` for scheduling.
+    This setting allows altering ``kubernetes.client.models.V1Pod`` object
+    before they are passed to the Kubernetes client by the ``PodLauncher``
+    for scheduling.
 
     To define a pod mutation hook, add a ``airflow_local_settings`` module
     to your PYTHONPATH that defines this ``pod_mutation_hook`` function.
diff --git a/scripts/ci/in_container/kubernetes/app/deploy_app.sh b/scripts/ci/in_container/kubernetes/app/deploy_app.sh
index 6ebc7b0..1f7832d 100755
--- a/scripts/ci/in_container/kubernetes/app/deploy_app.sh
+++ b/scripts/ci/in_container/kubernetes/app/deploy_app.sh
@@ -104,13 +104,17 @@ cat "${BUILD_DIRNAME}/configmaps.yaml"
 kubectl delete -f "${MY_DIR}/postgres.yaml" || true
 kubectl delete -f "${BUILD_DIRNAME}/airflow.yaml" || true
 kubectl delete -f "${MY_DIR}/secrets.yaml" || true
+kubectl apply -f "${MY_DIR}/secrets.yaml" -n test-namespace
 
 set -e
 
 kubectl apply -f "${MY_DIR}/secrets.yaml"
+kubectl apply -f "${MY_DIR}/secrets.yaml" -n test-namespace
 kubectl apply -f "${BUILD_DIRNAME}/configmaps.yaml"
+kubectl apply -f "${BUILD_DIRNAME}/configmaps.yaml" -n test-namespace
 kubectl apply -f "${MY_DIR}/postgres.yaml"
 kubectl apply -f "${MY_DIR}/volumes.yaml"
+kubectl apply -f "${MY_DIR}/volumes.yaml" -n test-namespace
 
 set +x
 set +o pipefail
diff --git a/scripts/ci/in_container/kubernetes/app/secrets.yaml b/scripts/ci/in_container/kubernetes/app/secrets.yaml
index fa8ae16..34571ed 100644
--- a/scripts/ci/in_container/kubernetes/app/secrets.yaml
+++ b/scripts/ci/in_container/kubernetes/app/secrets.yaml
@@ -22,5 +22,5 @@ metadata:
 type: Opaque
 data:
   # The sql_alchemy_conn value is a base64 encoded representation of this connection string:
-  # postgresql+psycopg2://root:root@postgres-airflow:5432/airflow
-  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6cm9vdEBwb3N0Z3Jlcy1haXJmbG93OjU0MzIvYWlyZmxvdwo=
+  # postgresql+psycopg2://root:root@postgres-airflow.default.svc.cluster.local:5432/airflow
+  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6cm9vdEBwb3N0Z3Jlcy1haXJmbG93LmRlZmF1bHQuc3ZjLmNsdXN0ZXIubG9jYWw6NTQzMi9haXJmbG93 # yamllint disable-line
diff --git a/scripts/ci/in_container/kubernetes/app/volumes.yaml b/scripts/ci/in_container/kubernetes/app/volumes.yaml
index 5d366a1..0e3af06 100644
--- a/scripts/ci/in_container/kubernetes/app/volumes.yaml
+++ b/scripts/ci/in_container/kubernetes/app/volumes.yaml
@@ -21,7 +21,7 @@ metadata:
   name: airflow-dags
 spec:
   accessModes:
-    - ReadWriteOnce
+    - ReadWriteMany
   capacity:
     storage: 2Gi
   hostPath:
@@ -36,7 +36,7 @@ spec:
     - ReadWriteMany
   resources:
     requests:
-      storage: 2Gi
+      storage: 2G
 ---
 kind: PersistentVolume
 apiVersion: v1
diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index 95333fc..9073493 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -16,19 +16,16 @@
 # under the License.
 #
 
-import uuid
+import random
 import re
 import string
-import random
-
-from airflow.utils import timezone
-from urllib3 import HTTPResponse
 from datetime import datetime
 
 import six
+from urllib3 import HTTPResponse
 
+from airflow.utils import timezone
 from tests.compat import mock
-from tests.test_utils.config import conf_vars
 
 try:
     from kubernetes.client.rest import ApiException
@@ -36,15 +33,8 @@ try:
     from airflow.configuration import conf  # noqa: F401
     from airflow.executors.kubernetes_executor import AirflowKubernetesScheduler
     from airflow.executors.kubernetes_executor import KubernetesExecutor
-    from airflow.executors.kubernetes_executor import KubeConfig
-    from airflow.executors.kubernetes_executor import KubernetesExecutorConfig
-    from airflow.kubernetes.worker_configuration import WorkerConfiguration
-    from airflow.exceptions import AirflowConfigException
     from airflow.utils.state import State
-    from airflow.version import version as airflow_version
-    from airflow.kubernetes.secret import Secret
-except ImportError as e:
-    print(e)
+except ImportError:
     AirflowKubernetesScheduler = None  # type: ignore
 
 if six.PY2:
@@ -130,852 +120,6 @@ class TestAirflowKubernetesScheduler(unittest.TestCase):
         self.assertEqual(datetime_obj, new_datetime_obj)
 
 
-class TestKubernetesWorkerConfiguration(unittest.TestCase):
-    """
-    Tests that if dags_volume_subpath/logs_volume_subpath configuration
-    options are passed to worker pod config
-    """
-
-    affinity_config = {
-        'podAntiAffinity': {
-            'requiredDuringSchedulingIgnoredDuringExecution': [
-                {
-                    'topologyKey': 'kubernetes.io/hostname',
-                    'labelSelector': {
-                        'matchExpressions': [
-                            {
-                                'key': 'app',
-                                'operator': 'In',
-                                'values': ['airflow']
-                            }
-                        ]
-                    }
-                }
-            ]
-        }
-    }
-
-    tolerations_config = [
-        {
-            'key': 'dedicated',
-            'operator': 'Equal',
-            'value': 'airflow'
-        },
-        {
-            'key': 'prod',
-            'operator': 'Exists'
-        }
-    ]
-
-    def setUp(self):
-        if AirflowKubernetesScheduler is None:
-            self.skipTest("kubernetes python package is not installed")
-
-        self.resources = mock.patch(
-            'airflow.kubernetes.worker_configuration.Resources'
-        )
-
-        for patcher in [self.resources]:
-            self.mock_foo = patcher.start()
-            self.addCleanup(patcher.stop)
-
-        self.kube_config = mock.MagicMock()
-        self.kube_config.airflow_home = '/'
-        self.kube_config.airflow_dags = 'dags'
-        self.kube_config.airflow_dags = 'logs'
-        self.kube_config.dags_volume_subpath = None
-        self.kube_config.logs_volume_subpath = None
-        self.kube_config.dags_in_image = False
-        self.kube_config.dags_folder = None
-        self.kube_config.git_dags_folder_mount_point = None
-        self.kube_config.kube_labels = {'dag_id': 'original_dag_id', 'my_label': 'label_id'}
-
-    def tearDown(self):
-        self.kube_config = None
-
-    def test_worker_configuration_no_subpaths(self):
-        worker_config = WorkerConfiguration(self.kube_config)
-        volumes, volume_mounts = worker_config._get_volumes_and_mounts()
-        volumes_list = [value for value in volumes.values()]
-        volume_mounts_list = [value for value in volume_mounts.values()]
-        for volume_or_mount in volumes_list + volume_mounts_list:
-            if volume_or_mount['name'] not in ['airflow-config', 'airflow-local-settings']:
-                self.assertNotIn(
-                    'subPath', volume_or_mount,
-                    "subPath shouldn't be defined"
-                )
-
-    @conf_vars({
-        ('kubernetes', 'git_ssh_known_hosts_configmap_name'): 'airflow-configmap',
-        ('kubernetes', 'git_ssh_key_secret_name'): 'airflow-secrets',
-        ('kubernetes', 'git_user'): 'some-user',
-        ('kubernetes', 'git_password'): 'some-password',
-        ('kubernetes', 'git_repo'): 'git@github.com:apache/airflow.git',
-        ('kubernetes', 'git_branch'): 'master',
-        ('kubernetes', 'git_dags_folder_mount_point'): '/usr/local/airflow/dags',
-        ('kubernetes', 'delete_worker_pods'): 'True',
-        ('kubernetes', 'kube_client_request_args'): '{"_request_timeout" : [60,360]}',
-    })
-    def test_worker_configuration_auth_both_ssh_and_user(self):
-        with self.assertRaisesRegex(AirflowConfigException,
-                                    'either `git_user` and `git_password`.*'
-                                    'or `git_ssh_key_secret_name`.*'
-                                    'but not both$'):
-            KubeConfig()
-
-    def test_worker_with_subpaths(self):
-        self.kube_config.dags_volume_subpath = 'dags'
-        self.kube_config.logs_volume_subpath = 'logs'
-        worker_config = WorkerConfiguration(self.kube_config)
-        volumes, volume_mounts = worker_config._get_volumes_and_mounts()
-
-        for volume in [value for value in volumes.values()]:
-            self.assertNotIn(
-                'subPath', volume,
-                "subPath isn't valid configuration for a volume"
-            )
-
-        for volume_mount in [value for value in volume_mounts.values()]:
-            if volume_mount['name'] != 'airflow-config':
-                self.assertIn(
-                    'subPath', volume_mount,
-                    "subPath should've been passed to volumeMount configuration"
-                )
-
-    def test_worker_generate_dag_volume_mount_path(self):
-        self.kube_config.git_dags_folder_mount_point = '/root/airflow/git/dags'
-        self.kube_config.dags_folder = '/root/airflow/dags'
-        worker_config = WorkerConfiguration(self.kube_config)
-
-        self.kube_config.dags_volume_claim = 'airflow-dags'
-        self.kube_config.dags_volume_host = ''
-        dag_volume_mount_path = worker_config.generate_dag_volume_mount_path()
-        self.assertEqual(dag_volume_mount_path, self.kube_config.dags_folder)
-
-        self.kube_config.dags_volume_claim = ''
-        self.kube_config.dags_volume_host = '/host/airflow/dags'
-        dag_volume_mount_path = worker_config.generate_dag_volume_mount_path()
-        self.assertEqual(dag_volume_mount_path, self.kube_config.dags_folder)
-
-        self.kube_config.dags_volume_claim = ''
-        self.kube_config.dags_volume_host = ''
-        dag_volume_mount_path = worker_config.generate_dag_volume_mount_path()
-        self.assertEqual(dag_volume_mount_path,
-                         self.kube_config.git_dags_folder_mount_point)
-
-    def test_worker_environment_no_dags_folder(self):
-        self.kube_config.airflow_configmap = ''
-        self.kube_config.git_dags_folder_mount_point = ''
-        self.kube_config.dags_folder = ''
-        worker_config = WorkerConfiguration(self.kube_config)
-        env = worker_config._get_environment()
-
-        self.assertNotIn('AIRFLOW__CORE__DAGS_FOLDER', env)
-
-    def test_worker_environment_when_dags_folder_specified(self):
-        self.kube_config.airflow_configmap = 'airflow-configmap'
-        self.kube_config.git_dags_folder_mount_point = ''
-        dags_folder = '/workers/path/to/dags'
-        self.kube_config.dags_folder = dags_folder
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        env = worker_config._get_environment()
-
-        self.assertEqual(dags_folder, env['AIRFLOW__CORE__DAGS_FOLDER'])
-
-    def test_worker_environment_dags_folder_using_git_sync(self):
-        self.kube_config.airflow_configmap = 'airflow-configmap'
-        self.kube_config.git_sync_dest = 'repo'
-        self.kube_config.git_subpath = 'dags'
-        self.kube_config.git_dags_folder_mount_point = '/workers/path/to/dags'
-
-        dags_folder = '{}/{}/{}'.format(self.kube_config.git_dags_folder_mount_point,
-                                        self.kube_config.git_sync_dest,
-                                        self.kube_config.git_subpath)
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        env = worker_config._get_environment()
-
-        self.assertEqual(dags_folder, env['AIRFLOW__CORE__DAGS_FOLDER'])
-
-    def test_init_environment_using_git_sync_ssh_without_known_hosts(self):
-        # Tests the init environment created with git-sync SSH authentication option is correct
-        # without known hosts file
-        self.kube_config.airflow_configmap = 'airflow-configmap'
-        self.kube_config.git_ssh_secret_name = 'airflow-secrets'
-        self.kube_config.git_ssh_known_hosts_configmap_name = None
-        self.kube_config.dags_volume_claim = None
-        self.kube_config.dags_volume_host = None
-        self.kube_config.dags_in_image = None
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        init_containers = worker_config._get_init_containers()
-
-        self.assertTrue(init_containers)  # check not empty
-        env = init_containers[0]['env']
-
-        self.assertTrue({'name': 'GIT_SSH_KEY_FILE', 'value': '/etc/git-secret/ssh'} in env)
-        self.assertTrue({'name': 'GIT_KNOWN_HOSTS', 'value': 'false'} in env)
-        self.assertTrue({'name': 'GIT_SYNC_SSH', 'value': 'true'} in env)
-
-    def test_init_environment_using_git_sync_ssh_with_known_hosts(self):
-        # Tests the init environment created with git-sync SSH authentication option is correct
-        # with known hosts file
-        self.kube_config.airflow_configmap = 'airflow-configmap'
-        self.kube_config.git_ssh_key_secret_name = 'airflow-secrets'
-        self.kube_config.git_ssh_known_hosts_configmap_name = 'airflow-configmap'
-        self.kube_config.dags_volume_claim = None
-        self.kube_config.dags_volume_host = None
-        self.kube_config.dags_in_image = None
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        init_containers = worker_config._get_init_containers()
-
-        self.assertTrue(init_containers)  # check not empty
-        env = init_containers[0]['env']
-
-        self.assertTrue({'name': 'GIT_SSH_KEY_FILE', 'value': '/etc/git-secret/ssh'} in env)
-        self.assertTrue({'name': 'GIT_KNOWN_HOSTS', 'value': 'true'} in env)
-        self.assertTrue({'name': 'GIT_SSH_KNOWN_HOSTS_FILE',
-                         'value': '/etc/git-secret/known_hosts'} in env)
-        self.assertTrue({'name': 'GIT_SYNC_SSH', 'value': 'true'} in env)
-
-    def test_init_environment_using_git_sync_user_without_known_hosts(self):
-        # Tests the init environment created with git-sync User authentication option is correct
-        # without known hosts file
-        self.kube_config.airflow_configmap = 'airflow-configmap'
-        self.kube_config.git_user = 'git_user'
-        self.kube_config.git_password = 'git_password'
-        self.kube_config.git_ssh_known_hosts_configmap_name = None
-        self.kube_config.git_ssh_key_secret_name = None
-        self.kube_config.dags_volume_claim = None
-        self.kube_config.dags_volume_host = None
-        self.kube_config.dags_in_image = None
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        init_containers = worker_config._get_init_containers()
-
-        self.assertTrue(init_containers)  # check not empty
-        env = init_containers[0]['env']
-
-        self.assertFalse({'name': 'GIT_SSH_KEY_FILE', 'value': '/etc/git-secret/ssh'} in env)
-        self.assertTrue({'name': 'GIT_SYNC_USERNAME', 'value': 'git_user'} in env)
-        self.assertTrue({'name': 'GIT_SYNC_PASSWORD', 'value': 'git_password'} in env)
-        self.assertTrue({'name': 'GIT_KNOWN_HOSTS', 'value': 'false'} in env)
-        self.assertFalse({'name': 'GIT_SSH_KNOWN_HOSTS_FILE',
-                          'value': '/etc/git-secret/known_hosts'} in env)
-        self.assertFalse({'name': 'GIT_SYNC_SSH', 'value': 'true'} in env)
-
-    def test_init_environment_using_git_sync_user_with_known_hosts(self):
-        # Tests the init environment created with git-sync User authentication option is correct
-        # with known hosts file
-        self.kube_config.airflow_configmap = 'airflow-configmap'
-        self.kube_config.git_user = 'git_user'
-        self.kube_config.git_password = 'git_password'
-        self.kube_config.git_ssh_known_hosts_configmap_name = 'airflow-configmap'
-        self.kube_config.git_ssh_key_secret_name = None
-        self.kube_config.dags_volume_claim = None
-        self.kube_config.dags_volume_host = None
-        self.kube_config.dags_in_image = None
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        init_containers = worker_config._get_init_containers()
-
-        self.assertTrue(init_containers)  # check not empty
-        env = init_containers[0]['env']
-
-        self.assertFalse({'name': 'GIT_SSH_KEY_FILE', 'value': '/etc/git-secret/ssh'} in env)
-        self.assertTrue({'name': 'GIT_SYNC_USERNAME', 'value': 'git_user'} in env)
-        self.assertTrue({'name': 'GIT_SYNC_PASSWORD', 'value': 'git_password'} in env)
-        self.assertTrue({'name': 'GIT_KNOWN_HOSTS', 'value': 'true'} in env)
-        self.assertTrue({'name': 'GIT_SSH_KNOWN_HOSTS_FILE',
-                         'value': '/etc/git-secret/known_hosts'} in env)
-        self.assertFalse({'name': 'GIT_SYNC_SSH', 'value': 'true'} in env)
-
-    def test_make_pod_git_sync_credentials_secret(self):
-        # Tests the pod created with git_sync_credentials_secret will get into the init container
-        self.kube_config.git_sync_credentials_secret = 'airflow-git-creds-secret'
-        self.kube_config.dags_volume_claim = None
-        self.kube_config.dags_volume_host = None
-        self.kube_config.dags_in_image = None
-        self.kube_config.worker_fs_group = None
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        kube_executor_config = KubernetesExecutorConfig(annotations=[],
-                                                        volumes=[],
-                                                        volume_mounts=[])
-
-        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
-                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
-                                     kube_executor_config)
-
-        username_env = {
-            'name': 'GIT_SYNC_USERNAME',
-            'valueFrom': {
-                'secretKeyRef': {
-                    'name': self.kube_config.git_sync_credentials_secret,
-                    'key': 'GIT_SYNC_USERNAME'
-                }
-            }
-        }
-        password_env = {
-            'name': 'GIT_SYNC_PASSWORD',
-            'valueFrom': {
-                'secretKeyRef': {
-                    'name': self.kube_config.git_sync_credentials_secret,
-                    'key': 'GIT_SYNC_PASSWORD'
-                }
-            }
-        }
-
-        self.assertIn(username_env, pod.init_containers[0]["env"],
-                      'The username env for git credentials did not get into the init container')
-
-        self.assertIn(password_env, pod.init_containers[0]["env"],
-                      'The password env for git credentials did not get into the init container')
-
-    def test_make_pod_git_sync_rev(self):
-        # Tests the pod created with git_sync_credentials_secret will get into the init container
-        self.kube_config.git_sync_rev = 'sampletag'
-        self.kube_config.dags_volume_claim = None
-        self.kube_config.dags_volume_host = None
-        self.kube_config.dags_in_image = None
-        self.kube_config.worker_fs_group = None
-        self.kube_config.git_dags_folder_mount_point = 'dags'
-        self.kube_config.git_sync_dest = 'repo'
-        self.kube_config.git_subpath = 'path'
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        kube_executor_config = KubernetesExecutorConfig(annotations=[],
-                                                        volumes=[],
-                                                        volume_mounts=[])
-
-        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
-                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
-                                     kube_executor_config)
-
-        rev_env = {
-            'name': 'GIT_SYNC_REV',
-            'value': self.kube_config.git_sync_rev
-        }
-
-        self.assertIn(rev_env, pod.init_containers[0]["env"],
-                      'The git_sync_rev env did not get into the init container')
-
-    def test_init_environment_using_git_sync_run_as_user_empty(self):
-        # Tests if git_syn_run_as_user is none, then no securityContext created in init container
-
-        self.kube_config.dags_volume_claim = None
-        self.kube_config.dags_volume_host = None
-        self.kube_config.dags_in_image = None
-        self.kube_config.git_sync_run_as_user = ''
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        init_containers = worker_config._get_init_containers()
-        self.assertTrue(init_containers)  # check not empty
-
-        self.assertNotIn(
-            'securityContext', init_containers[0],
-            "securityContext shouldn't be defined"
-        )
-
-    def test_make_pod_run_as_user_0(self):
-        # Tests the pod created with run-as-user 0 actually gets that in it's config
-        self.kube_config.worker_run_as_user = 0
-        self.kube_config.dags_volume_claim = None
-        self.kube_config.dags_volume_host = None
-        self.kube_config.dags_in_image = None
-        self.kube_config.worker_fs_group = None
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        kube_executor_config = KubernetesExecutorConfig(annotations=[],
-                                                        volumes=[],
-                                                        volume_mounts=[])
-
-        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
-                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
-                                     kube_executor_config)
-
-        self.assertEqual(0, pod.security_context['runAsUser'])
-
-    def test_make_pod_assert_labels(self):
-        # Tests the pod created has all the expected labels set
-        self.kube_config.dags_folder = 'dags'
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        kube_executor_config = KubernetesExecutorConfig(annotations=[],
-                                                        volumes=[],
-
-                                                        volume_mounts=[])
-        pod = worker_config.make_pod("default", "sample-uuid", "test_pod_id", "test_dag_id",
-                                     "test_task_id", "2019-11-21 11:08:22.920875", 1, "bash -c 'ls /'",
-                                     kube_executor_config)
-        expected_labels = {
-            'airflow-worker': 'sample-uuid',
-            'airflow_version': airflow_version.replace('+', '-'),
-            'dag_id': 'test_dag_id',
-            'execution_date': '2019-11-21 11:08:22.920875',
-            'kubernetes_executor': 'True',
-            'my_label': 'label_id',
-            'task_id': 'test_task_id',
-            'try_number': '1'
-        }
-        self.assertEqual(pod.labels, expected_labels)
-
-    def test_make_pod_git_sync_ssh_without_known_hosts(self):
-        # Tests the pod created with git-sync SSH authentication option is correct without known hosts
-        self.kube_config.airflow_configmap = 'airflow-configmap'
-        self.kube_config.git_ssh_key_secret_name = 'airflow-secrets'
-        self.kube_config.dags_volume_claim = None
-        self.kube_config.dags_volume_host = None
-        self.kube_config.dags_in_image = None
-        self.kube_config.worker_fs_group = None
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        kube_executor_config = KubernetesExecutorConfig(annotations=[],
-                                                        volumes=[],
-                                                        volume_mounts=[])
-
-        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
-                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
-                                     kube_executor_config)
-
-        init_containers = worker_config._get_init_containers()
-        git_ssh_key_file = next((x['value'] for x in init_containers[0]['env']
-                                 if x['name'] == 'GIT_SSH_KEY_FILE'), None)
-        volume_mount_ssh_key = next((x['mountPath'] for x in init_containers[0]['volumeMounts']
-                                     if x['name'] == worker_config.git_sync_ssh_secret_volume_name),
-                                    None)
-        self.assertTrue(git_ssh_key_file)
-        self.assertTrue(volume_mount_ssh_key)
-        self.assertEqual(65533, pod.security_context['fsGroup'])
-        self.assertEqual(git_ssh_key_file,
-                         volume_mount_ssh_key,
-                         'The location where the git ssh secret is mounted'
-                         ' needs to be the same as the GIT_SSH_KEY_FILE path')
-
-    def test_make_pod_git_sync_ssh_with_known_hosts(self):
-        # Tests the pod created with git-sync SSH authentication option is correct with known hosts
-        self.kube_config.airflow_configmap = 'airflow-configmap'
-        self.kube_config.git_ssh_secret_name = 'airflow-secrets'
-        self.kube_config.dags_volume_claim = None
-        self.kube_config.dags_volume_host = None
-        self.kube_config.dags_in_image = None
-
-        worker_config = WorkerConfiguration(self.kube_config)
-
-        init_containers = worker_config._get_init_containers()
-        git_ssh_known_hosts_file = next((x['value'] for x in init_containers[0]['env']
-                                         if x['name'] == 'GIT_SSH_KNOWN_HOSTS_FILE'), None)
-
-        volume_mount_ssh_known_hosts_file = next(
-            (x['mountPath'] for x in init_containers[0]['volumeMounts']
-             if x['name'] == worker_config.git_sync_ssh_known_hosts_volume_name),
-            None)
-        self.assertTrue(git_ssh_known_hosts_file)
-        self.assertTrue(volume_mount_ssh_known_hosts_file)
-        self.assertEqual(git_ssh_known_hosts_file,
-                         volume_mount_ssh_known_hosts_file,
-                         'The location where the git known hosts file is mounted'
-                         ' needs to be the same as the GIT_SSH_KNOWN_HOSTS_FILE path')
-
-    def test_make_pod_with_empty_executor_config(self):
-        self.kube_config.kube_affinity = self.affinity_config
-        self.kube_config.kube_tolerations = self.tolerations_config
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        kube_executor_config = KubernetesExecutorConfig(annotations=[],
-                                                        volumes=[],
-                                                        volume_mounts=[]
-                                                        )
-
-        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
-                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
-                                     kube_executor_config)
-
-        self.assertTrue(pod.affinity['podAntiAffinity'] is not None)
-        self.assertEqual('app',
-                         pod.affinity['podAntiAffinity']
-                         ['requiredDuringSchedulingIgnoredDuringExecution'][0]
-                         ['labelSelector']
-                         ['matchExpressions'][0]
-                         ['key'])
-
-        self.assertEqual(2, len(pod.tolerations))
-        self.assertEqual('prod', pod.tolerations[1]['key'])
-
-    def test_make_pod_with_executor_config(self):
-        worker_config = WorkerConfiguration(self.kube_config)
-        kube_executor_config = KubernetesExecutorConfig(affinity=self.affinity_config,
-                                                        tolerations=self.tolerations_config,
-                                                        annotations=[],
-                                                        volumes=[],
-                                                        volume_mounts=[]
-                                                        )
-
-        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
-                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
-                                     kube_executor_config)
-
-        self.assertTrue(pod.affinity['podAntiAffinity'] is not None)
-        self.assertEqual('app',
-                         pod.affinity['podAntiAffinity']
-                         ['requiredDuringSchedulingIgnoredDuringExecution'][0]
-                         ['labelSelector']
-                         ['matchExpressions'][0]
-                         ['key'])
-
-        self.assertEqual(2, len(pod.tolerations))
-        self.assertEqual('prod', pod.tolerations[1]['key'])
-
-    def test_worker_pvc_dags(self):
-        # Tests persistence volume config created when `dags_volume_claim` is set
-        self.kube_config.dags_volume_claim = 'airflow-dags'
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        volumes, volume_mounts = worker_config._get_volumes_and_mounts()
-
-        init_containers = worker_config._get_init_containers()
-
-        dag_volume = [volume for volume in volumes.values() if volume['name'] == 'airflow-dags']
-        dag_volume_mount = [mount for mount in volume_mounts.values() if mount['name'] == 'airflow-dags']
-
-        self.assertEqual('airflow-dags', dag_volume[0]['persistentVolumeClaim']['claimName'])
-        self.assertEqual(1, len(dag_volume_mount))
-        self.assertTrue(dag_volume_mount[0]['readOnly'])
-        self.assertEqual(0, len(init_containers))
-
-    def test_worker_git_dags(self):
-        # Tests persistence volume config created when `git_repo` is set
-        self.kube_config.dags_volume_claim = None
-        self.kube_config.dags_volume_host = None
-        self.kube_config.dags_folder = '/usr/local/airflow/dags'
-        self.kube_config.worker_dags_folder = '/usr/local/airflow/dags'
-
-        self.kube_config.git_sync_container_repository = 'gcr.io/google-containers/git-sync-amd64'
-        self.kube_config.git_sync_container_tag = 'v2.0.5'
-        self.kube_config.git_sync_container = 'gcr.io/google-containers/git-sync-amd64:v2.0.5'
-        self.kube_config.git_sync_init_container_name = 'git-sync-clone'
-        self.kube_config.git_subpath = 'dags_folder'
-        self.kube_config.git_sync_root = '/git'
-        self.kube_config.git_sync_run_as_user = 65533
-        self.kube_config.git_dags_folder_mount_point = '/usr/local/airflow/dags/repo/dags_folder'
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        volumes, volume_mounts = worker_config._get_volumes_and_mounts()
-
-        dag_volume = [volume for volume in volumes.values() if volume['name'] == 'airflow-dags']
-        dag_volume_mount = [mount for mount in volume_mounts.values() if mount['name'] == 'airflow-dags']
-
-        self.assertTrue('emptyDir' in dag_volume[0])
-        self.assertEqual(self.kube_config.git_dags_folder_mount_point, dag_volume_mount[0]['mountPath'])
-        self.assertTrue(dag_volume_mount[0]['readOnly'])
-
-        init_container = worker_config._get_init_containers()[0]
-        init_container_volume_mount = [mount for mount in init_container['volumeMounts']
-                                       if mount['name'] == 'airflow-dags']
-
-        self.assertEqual('git-sync-clone', init_container['name'])
-        self.assertEqual('gcr.io/google-containers/git-sync-amd64:v2.0.5', init_container['image'])
-        self.assertEqual(1, len(init_container_volume_mount))
-        self.assertFalse(init_container_volume_mount[0]['readOnly'])
-        self.assertEqual(65533, init_container['securityContext']['runAsUser'])
-
-    def test_worker_container_dags(self):
-        # Tests that the 'airflow-dags' persistence volume is NOT created when `dags_in_image` is set
-        self.kube_config.dags_in_image = True
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        volumes, volume_mounts = worker_config._get_volumes_and_mounts()
-
-        dag_volume = [volume for volume in volumes.values() if volume['name'] == 'airflow-dags']
-        dag_volume_mount = [mount for mount in volume_mounts.values() if mount['name'] == 'airflow-dags']
-
-        init_containers = worker_config._get_init_containers()
-
-        self.assertEqual(0, len(dag_volume))
-        self.assertEqual(0, len(dag_volume_mount))
-        self.assertEqual(0, len(init_containers))
-
-    def test_set_airflow_config_configmap(self):
-        """
-        Test that airflow.cfg can be set via configmap by
-        checking volume & volume-mounts are set correctly.
-        """
-        self.kube_config.airflow_home = '/usr/local/airflow'
-        self.kube_config.airflow_configmap = 'airflow-configmap'
-        self.kube_config.airflow_local_settings_configmap = None
-        self.kube_config.dags_folder = '/workers/path/to/dags'
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        kube_executor_config = KubernetesExecutorConfig(annotations=[],
-                                                        volumes=[],
-                                                        volume_mounts=[])
-
-        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
-                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
-                                     kube_executor_config)
-
-        airflow_config_volume = [
-            volume for volume in pod.volumes if volume["name"] == 'airflow-config'
-        ]
-        # Test that volume_name is found
-        self.assertEqual(1, len(airflow_config_volume))
-
-        # Test that config map exists
-        self.assertEqual(
-            {'configMap': {'name': 'airflow-configmap'}, 'name': 'airflow-config'},
-            airflow_config_volume[0]
-        )
-
-        # Test that only 1 Volume Mounts exists with 'airflow-config' name
-        # One for airflow.cfg
-        volume_mounts = [
-            volume_mount for volume_mount in pod.volume_mounts
-            if volume_mount['name'] == 'airflow-config'
-        ]
-
-        self.assertEqual([
-            {
-                'mountPath': '/usr/local/airflow/airflow.cfg',
-                'name': 'airflow-config',
-                'readOnly': True,
-                'subPath': 'airflow.cfg',
-            }
-        ],
-            volume_mounts
-        )
-
-    def test_set_airflow_local_settings_configmap(self):
-        """
-        Test that airflow_local_settings.py can be set via configmap by
-        checking volume & volume-mounts are set correctly.
-        """
-        self.kube_config.airflow_home = '/usr/local/airflow'
-        self.kube_config.airflow_configmap = 'airflow-configmap'
-        self.kube_config.airflow_local_settings_configmap = 'airflow-configmap'
-        self.kube_config.dags_folder = '/workers/path/to/dags'
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        kube_executor_config = KubernetesExecutorConfig(annotations=[],
-                                                        volumes=[],
-                                                        volume_mounts=[])
-
-        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
-                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
-                                     kube_executor_config)
-
-        airflow_config_volume = [
-            volume for volume in pod.volumes if volume["name"] == 'airflow-config'
-        ]
-        # Test that volume_name is found
-        self.assertEqual(1, len(airflow_config_volume))
-
-        # Test that config map exists
-        self.assertEqual(
-            {'configMap': {'name': 'airflow-configmap'}, 'name': 'airflow-config'},
-            airflow_config_volume[0]
-        )
-
-        # Test that 2 Volume Mounts exists and has 2 different mount-paths
-        # One for airflow.cfg
-        # Second for airflow_local_settings.py
-        volume_mounts = [
-            volume_mount for volume_mount in pod.volume_mounts
-            if volume_mount['name'] == 'airflow-config'
-        ]
-        self.assertEqual(2, len(volume_mounts))
-
-        six.assertCountEqual(
-            self,
-            [
-                {
-                    'mountPath': '/usr/local/airflow/airflow.cfg',
-                    'name': 'airflow-config',
-                    'readOnly': True,
-                    'subPath': 'airflow.cfg',
-                },
-                {
-                    'mountPath': '/usr/local/airflow/config/airflow_local_settings.py',
-                    'name': 'airflow-config',
-                    'readOnly': True,
-                    'subPath': 'airflow_local_settings.py',
-                }
-            ],
-            volume_mounts
-        )
-
-    def test_set_airflow_configmap_different_for_local_setting(self):
-        """
-        Test that airflow_local_settings.py can be set via configmap by
-        checking volume & volume-mounts are set correctly.
-        """
-        self.kube_config.airflow_home = '/usr/local/airflow'
-        self.kube_config.airflow_configmap = 'airflow-configmap'
-        self.kube_config.airflow_local_settings_configmap = 'airflow-ls-configmap'
-        self.kube_config.dags_folder = '/workers/path/to/dags'
-
-        worker_config = WorkerConfiguration(self.kube_config)
-        kube_executor_config = KubernetesExecutorConfig(annotations=[],
-                                                        volumes=[],
-                                                        volume_mounts=[])
-
-        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
-                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'",
-                                     kube_executor_config)
-
-        airflow_local_settings_volume = [
-            volume for volume in pod.volumes if volume["name"] == 'airflow-local-settings'
-        ]
-        # Test that volume_name is found
-        self.assertEqual(1, len(airflow_local_settings_volume))
-
-        # Test that config map exists
-        self.assertEqual(
-            [{'configMap': {'name': 'airflow-ls-configmap'}, 'name': 'airflow-local-settings'}],
-            airflow_local_settings_volume
-        )
-
-        # Test that 2 Volume Mounts exists and has 2 different mount-paths
-        # One for airflow.cfg
-        # Second for airflow_local_settings.py
-        airflow_cfg_volume_mount = [
-            volume_mount for volume_mount in pod.volume_mounts
-            if volume_mount['name'] == 'airflow-config'
-        ]
-
-        local_setting_volume_mount = [
-            volume_mount for volume_mount in pod.volume_mounts
-            if volume_mount['name'] == 'airflow-local-settings'
-        ]
-        self.assertEqual(1, len(airflow_cfg_volume_mount))
-        self.assertEqual(1, len(local_setting_volume_mount))
-
-        self.assertEqual(
-            [
-                {
-                    'mountPath': '/usr/local/airflow/config/airflow_local_settings.py',
-                    'name': 'airflow-local-settings',
-                    'readOnly': True,
-                    'subPath': 'airflow_local_settings.py',
-                }
-            ],
-            local_setting_volume_mount
-        )
-
-        print(airflow_cfg_volume_mount)
-
-        self.assertEqual(
-            [
-                {
-                    'mountPath': '/usr/local/airflow/airflow.cfg',
-                    'name': 'airflow-config',
-                    'readOnly': True,
-                    'subPath': 'airflow.cfg',
-                }
-            ],
-            airflow_cfg_volume_mount
-        )
-
-    def test_kubernetes_environment_variables(self):
-        # Tests the kubernetes environment variables get copied into the worker pods
-        input_environment = {
-            'ENVIRONMENT': 'prod',
-            'LOG_LEVEL': 'warning'
-        }
-        self.kube_config.kube_env_vars = input_environment
-        worker_config = WorkerConfiguration(self.kube_config)
-        env = worker_config._get_environment()
-        for key in input_environment:
-            self.assertIn(key, env)
-            self.assertIn(input_environment[key], env.values())
-
-        core_executor = 'AIRFLOW__CORE__EXECUTOR'
-        input_environment = {
-            core_executor: 'NotLocalExecutor'
-        }
-        self.kube_config.kube_env_vars = input_environment
-        worker_config = WorkerConfiguration(self.kube_config)
-        env = worker_config._get_environment()
-        self.assertEqual(env[core_executor], 'LocalExecutor')
-
-    def test_kubernetes_environment_variables_for_init_container(self):
-        self.kube_config.dags_volume_claim = None
-        self.kube_config.dags_volume_host = None
-        self.kube_config.dags_in_image = None
-
-        # Tests the kubernetes environment variables get copied into the worker pods
-        input_environment = {
-            'ENVIRONMENT': 'prod',
-            'LOG_LEVEL': 'warning'
-        }
-        self.kube_config.kube_env_vars = input_environment
-        worker_config = WorkerConfiguration(self.kube_config)
-        env = worker_config._get_environment()
-        for key in input_environment:
-            self.assertIn(key, env)
-            self.assertIn(input_environment[key], env.values())
-
-        init_containers = worker_config._get_init_containers()
-
-        self.assertTrue(init_containers)  # check not empty
-        env = init_containers[0]['env']
-
-        self.assertTrue({'name': 'ENVIRONMENT', 'value': 'prod'} in env)
-        self.assertTrue({'name': 'LOG_LEVEL', 'value': 'warning'} in env)
-
-    def test_get_secrets(self):
-        # Test when secretRef is None and kube_secrets is not empty
-        self.kube_config.kube_secrets = {
-            'AWS_SECRET_KEY': 'airflow-secret=aws_secret_key',
-            'POSTGRES_PASSWORD': 'airflow-secret=postgres_credentials'
-        }
-        self.kube_config.env_from_secret_ref = None
-        worker_config = WorkerConfiguration(self.kube_config)
-        secrets = worker_config._get_secrets()
-        secrets.sort(key=lambda secret: secret.deploy_target)
-        expected = [
-            Secret('env', 'AWS_SECRET_KEY', 'airflow-secret', 'aws_secret_key'),
-            Secret('env', 'POSTGRES_PASSWORD', 'airflow-secret', 'postgres_credentials')
-        ]
-        self.assertListEqual(expected, secrets)
-
-        # Test when secret is not empty and kube_secrets is empty dict
-        self.kube_config.kube_secrets = {}
-        self.kube_config.env_from_secret_ref = 'secret_a,secret_b'
-        worker_config = WorkerConfiguration(self.kube_config)
-        secrets = worker_config._get_secrets()
-        expected = [
-            Secret('env', None, 'secret_a'),
-            Secret('env', None, 'secret_b')
-        ]
-        self.assertListEqual(expected, secrets)
-
-    def test_get_configmaps(self):
-        # Test when configmap is empty
-        self.kube_config.env_from_configmap_ref = ''
-        worker_config = WorkerConfiguration(self.kube_config)
-        configmaps = worker_config._get_configmaps()
-        self.assertListEqual([], configmaps)
-
-        # test when configmap is not empty
-        self.kube_config.env_from_configmap_ref = 'configmap_a,configmap_b'
-        worker_config = WorkerConfiguration(self.kube_config)
-        configmaps = worker_config._get_configmaps()
-        self.assertListEqual(['configmap_a', 'configmap_b'], configmaps)
-
-    def test_get_labels(self):
-        worker_config = WorkerConfiguration(self.kube_config)
-        labels = worker_config._get_labels({'my_kube_executor_label': 'kubernetes'}, {
-            'dag_id': 'override_dag_id',
-        })
-        self.assertEqual({
-            'my_label': 'label_id',
-            'dag_id': 'override_dag_id',
-            'my_kube_executor_label': 'kubernetes',
-        }, labels)
-
-
 class TestKubernetesExecutor(unittest.TestCase):
     """
     Tests if an ApiException from the Kube Client will cause the task to
@@ -1001,6 +145,9 @@ class TestKubernetesExecutor(unittest.TestCase):
         mock_kube_client.create_namespaced_pod = mock.MagicMock(
             side_effect=ApiException(http_resp=r))
         mock_get_kube_client.return_value = mock_kube_client
+        mock_api_client = mock.MagicMock()
+        mock_api_client.sanitize_for_serialization.return_value = {}
+        mock_kube_client.api_client = mock_api_client
 
         kubernetesExecutor = KubernetesExecutor()
         kubernetesExecutor.start()
@@ -1045,10 +192,12 @@ class TestKubernetesExecutor(unittest.TestCase):
         executor._change_state(key, State.RUNNING, 'pod_id', 'default')
         self.assertTrue(executor.event_buffer[key] == State.RUNNING)
 
+    @mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
     @mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
-    def test_change_state_success(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher):
+    def test_change_state_success(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher,
+                                  mock_kube_config):
         executor = KubernetesExecutor()
         executor.start()
         test_time = timezone.utcnow()
@@ -1057,11 +206,12 @@ class TestKubernetesExecutor(unittest.TestCase):
         self.assertTrue(executor.event_buffer[key] == State.SUCCESS)
         mock_delete_pod.assert_called_once_with('pod_id', 'default')
 
+    @mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
     @mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
-    def test_change_state_failed_no_deletion(self, mock_delete_pod, mock_get_kube_client,
-                                             mock_kubernetes_job_watcher):
+    def test_change_state_failed(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher,
+                                 mock_kube_config):
         executor = KubernetesExecutor()
         executor.kube_config.delete_worker_pods = False
         executor.kube_config.delete_worker_pods_on_failure = False
@@ -1072,11 +222,12 @@ class TestKubernetesExecutor(unittest.TestCase):
         self.assertTrue(executor.event_buffer[key] == State.FAILED)
         mock_delete_pod.assert_not_called()
 
+    @mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
     @mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
     def test_change_state_skip_pod_deletion(self, mock_delete_pod, mock_get_kube_client,
-                                            mock_kubernetes_job_watcher):
+                                            mock_kubernetes_job_watcher, mock_kube_config):
         test_time = timezone.utcnow()
         executor = KubernetesExecutor()
         executor.kube_config.delete_worker_pods = False
@@ -1087,11 +238,12 @@ class TestKubernetesExecutor(unittest.TestCase):
         self.assertTrue(executor.event_buffer[key] == State.SUCCESS)
         mock_delete_pod.assert_not_called()
 
+    @mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
     @mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
     def test_change_state_failed_pod_deletion(self, mock_delete_pod, mock_get_kube_client,
-                                              mock_kubernetes_job_watcher):
+                                              mock_kubernetes_job_watcher, mock_kube_config):
         executor = KubernetesExecutor()
         executor.kube_config.delete_worker_pods_on_failure = True
 
diff --git a/tests/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py b/tests/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py
deleted file mode 100644
index 87d0073..0000000
--- a/tests/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py
+++ /dev/null
@@ -1,396 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from airflow.kubernetes.kubernetes_request_factory.kubernetes_request_factory import KubernetesRequestFactory
-from airflow.kubernetes.pod import Pod, Resources
-from airflow.kubernetes.pod_runtime_info_env import PodRuntimeInfoEnv
-from airflow.kubernetes.secret import Secret
-from parameterized import parameterized
-import unittest
-import copy
-
-
-class TestKubernetesRequestFactory(unittest.TestCase):
-
-    def setUp(self):
-
-        self.expected = {
-            'apiVersion': 'v1',
-            'kind': 'Pod',
-            'metadata': {
-                'name': 'name'
-            },
-            'spec': {
-                'containers': [{
-                    'name': 'base',
-                    'image': 'airflow-worker:latest',
-                    'command': [
-                        "/usr/local/airflow/entrypoint.sh",
-                        "/bin/bash sleep 25"
-                    ],
-                }],
-                'restartPolicy': 'Never'
-            }
-        }
-        self.input_req = copy.deepcopy(self.expected)
-
-    def test_extract_image(self):
-        image = 'v3.14'
-        pod = Pod(image, {}, [])
-        KubernetesRequestFactory.extract_image(pod, self.input_req)
-        self.expected['spec']['containers'][0]['image'] = image
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_extract_image_pull_policy(self):
-        # Test when pull policy is not none
-        pull_policy = 'IfNotPresent'
-        pod = Pod('v3.14', {}, [], image_pull_policy=pull_policy)
-
-        KubernetesRequestFactory.extract_image_pull_policy(pod, self.input_req)
-        self.expected['spec']['containers'][0]['imagePullPolicy'] = pull_policy
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_add_secret_to_env(self):
-        secret = Secret('env', 'target', 'my-secret', 'KEY')
-        secret_list = []
-        self.expected = [{
-            'name': 'TARGET',
-            'valueFrom': {
-                'secretKeyRef': {
-                    'name': 'my-secret',
-                    'key': 'KEY'
-                }
-            }
-        }]
-        KubernetesRequestFactory.add_secret_to_env(secret_list, secret)
-        self.assertListEqual(secret_list, self.expected)
-
-    def test_extract_labels(self):
-        # Test when labels are not empty
-        labels = {'label_a': 'val_a', 'label_b': 'val_b'}
-        pod = Pod('v3.14', {}, [], labels=labels)
-        self.expected['metadata']['labels'] = labels
-        KubernetesRequestFactory.extract_labels(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_extract_annotations(self):
-        # Test when annotations are not empty
-        annotations = {'annot_a': 'val_a', 'annot_b': 'val_b'}
-        pod = Pod('v3.14', {}, [], annotations=annotations)
-        self.expected['metadata']['annotations'] = annotations
-        KubernetesRequestFactory.extract_annotations(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_extract_affinity(self):
-        # Test when affinity is not empty
-        affinity = {'podAffinity': 'requiredDuringSchedulingIgnoredDuringExecution'}
-        pod = Pod('v3.14', {}, [], affinity=affinity)
-        self.expected['spec']['affinity'] = affinity
-        KubernetesRequestFactory.extract_affinity(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_extract_node_selector(self):
-        # Test when affinity is not empty
-        node_selectors = {'disktype': 'ssd', 'accelerator': 'nvidia-tesla-p100'}
-        pod = Pod('v3.14', {}, [], node_selectors=node_selectors)
-        self.expected['spec']['nodeSelector'] = node_selectors
-        KubernetesRequestFactory.extract_node_selector(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_extract_cmds(self):
-        cmds = ['test-cmd.sh']
-        pod = Pod('v3.14', {}, cmds)
-        KubernetesRequestFactory.extract_cmds(pod, self.input_req)
-        self.expected['spec']['containers'][0]['command'] = cmds
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_extract_args(self):
-        args = ['test_arg.sh']
-        pod = Pod('v3.14', {}, [], args=args)
-        KubernetesRequestFactory.extract_args(pod, self.input_req)
-        self.expected['spec']['containers'][0]['args'] = args
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_attach_volumes(self):
-        # Test when volumes is not empty list
-        volumes = ['vol_a', 'vol_b']
-        pod = Pod('v3.14', {}, [], volumes=volumes)
-        self.expected['spec']['volumes'] = volumes
-        KubernetesRequestFactory.attach_volumes(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_attach_volume_mounts(self):
-        # Test when volumes is not empty list
-        volume_mounts = ['vol_a', 'vol_b']
-        pod = Pod('v3.14', {}, [], volume_mounts=volume_mounts)
-        self.expected['spec']['containers'][0]['volumeMounts'] = volume_mounts
-        KubernetesRequestFactory.attach_volume_mounts(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_extract_name(self):
-        name = 'pod-name'
-        pod = Pod('v3.14', {}, [], name=name)
-        self.expected['metadata']['name'] = name
-        KubernetesRequestFactory.extract_name(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_extract_volume_secrets(self):
-        # Test when secrets is not empty
-        secrets = [
-            Secret('volume', 'KEY1', 's1', 'key-1'),
-            Secret('env', 'KEY2', 's2'),
-            Secret('volume', 'KEY3', 's3', 'key-2')
-        ]
-        pod = Pod('v3.14', {}, [], secrets=secrets)
-        self.expected['spec']['containers'][0]['volumeMounts'] = [{
-            'mountPath': 'KEY1',
-            'name': 'secretvol0',
-            'readOnly': True
-        }, {
-            'mountPath': 'KEY3',
-            'name': 'secretvol1',
-            'readOnly': True
-        }]
-        self.expected['spec']['volumes'] = [{
-            'name': 'secretvol0',
-            'secret': {
-                'secretName': 's1'
-            }
-        }, {
-            'name': 'secretvol1',
-            'secret': {
-                'secretName': 's3'
-            }
-        }]
-        KubernetesRequestFactory.extract_volume_secrets(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_extract_env_and_secrets(self):
-        # Test when secrets and envs are not empty
-        secrets = [
-            Secret('env', None, 's1'),
-            Secret('volume', 'KEY2', 's2', 'key-2'),
-            Secret('env', None, 's3')
-        ]
-        envs = {
-            'ENV1': 'val1',
-            'ENV2': 'val2'
-        }
-        configmaps = ['configmap_a', 'configmap_b']
-        pod_runtime_envs = [PodRuntimeInfoEnv("ENV3", "status.podIP")]
-        pod = Pod(
-            image='v3.14',
-            envs=envs,
-            cmds=[],
-            secrets=secrets,
-            configmaps=configmaps,
-            pod_runtime_info_envs=pod_runtime_envs)
-        self.expected['spec']['containers'][0]['env'] = [
-            {'name': 'ENV1', 'value': 'val1'},
-            {'name': 'ENV2', 'value': 'val2'},
-            {
-                'name': 'ENV3',
-                'valueFrom': {
-                    'fieldRef': {
-                        'fieldPath': 'status.podIP'
-                    }
-                }
-            }
-        ]
-        self.expected['spec']['containers'][0]['envFrom'] = [{
-            'secretRef': {
-                'name': 's1'
-            }
-        }, {
-            'secretRef': {
-                'name': 's3'
-            }
-        }, {
-            'configMapRef': {
-                'name': 'configmap_a'
-            }
-        }, {
-            'configMapRef': {
-                'name': 'configmap_b'
-            }
-        }]
-
-        KubernetesRequestFactory.extract_env_and_secrets(pod, self.input_req)
-        self.input_req['spec']['containers'][0]['env'].sort(key=lambda x: x['name'])
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_extract_env_and_secret_order(self):
-        envs = {
-            'ENV': 'val1',
-        }
-        pod_runtime_envs = [PodRuntimeInfoEnv('RUNTIME_ENV', 'status.podIP')]
-        pod = Pod(
-            image='v3.14',
-            envs=envs,
-            cmds=[],
-            pod_runtime_info_envs=pod_runtime_envs)
-        self.expected['spec']['containers'][0]['env'] = [
-            {
-                'name': 'RUNTIME_ENV',
-                'valueFrom': {
-                    'fieldRef': {
-                        'fieldPath': 'status.podIP'
-                    }
-                }
-            },
-            {'name': 'ENV', 'value': 'val1'}
-        ]
-        KubernetesRequestFactory.extract_env_and_secrets(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_extract_resources(self):
-        # Test when resources is not empty
-        resources = Resources(
-            request_memory='1Gi',
-            request_cpu=1,
-            limit_memory='2Gi',
-            limit_cpu=2)
-
-        pod = Pod('v3.14', {}, [], resources=resources)
-        self.expected['spec']['containers'][0]['resources'] = {
-            'requests': {
-                'memory': '1Gi',
-                'cpu': 1
-            },
-            'limits': {
-                'memory': '2Gi',
-                'cpu': 2
-            },
-        }
-        KubernetesRequestFactory.extract_resources(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_display_resources(self):
-        resources_string = str(Resources('1Gi', 1))
-        self.assertEqual(
-            resources_string,
-            "Request: [cpu: 1, memory: 1Gi], Limit: [cpu: None, memory: None, gpu: None]")
-
-    def test_extract_limits(self):
-        # Test when resources is not empty
-        resources = Resources(
-            limit_memory='1Gi',
-            limit_cpu=1)
-
-        pod = Pod('v3.14', {}, [], resources=resources)
-        self.expected['spec']['containers'][0]['resources'] = {
-            'limits': {
-                'memory': '1Gi',
-                'cpu': 1
-            }
-        }
-        KubernetesRequestFactory.extract_resources(pod, self.input_req)
-        self.assertEqual(self.expected, self.input_req)
-
-    def test_extract_all_resources(self):
-        # Test when resources is not empty
-        resources = Resources(
-            request_memory='1Gi',
-            request_cpu=1,
-            limit_memory='2Gi',
-            limit_cpu=2,
-            limit_gpu=3)
-
-        pod = Pod('v3.14', {}, [], resources=resources)
-        self.expected['spec']['containers'][0]['resources'] = {
-            'requests': {
-                'memory': '1Gi',
-                'cpu': 1
-            },
-            'limits': {
-                'memory': '2Gi',
-                'cpu': 2,
-                'nvidia.com/gpu': 3
-            }
-        }
-        KubernetesRequestFactory.extract_resources(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_extract_init_containers(self):
-        init_container = 'init_container'
-        pod = Pod('v3.14', {}, [], init_containers=init_container)
-        self.expected['spec']['initContainers'] = init_container
-        KubernetesRequestFactory.extract_init_containers(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_extract_service_account_name(self):
-        service_account_name = 'service_account_name'
-        pod = Pod('v3.14', {}, [], service_account_name=service_account_name)
-        self.expected['spec']['serviceAccountName'] = service_account_name
-        KubernetesRequestFactory.extract_service_account_name(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_extract_hostnetwork(self):
-        hostnetwork = True
-        pod = Pod('v3.14', {}, [], hostnetwork=hostnetwork)
-        self.expected['spec']['hostNetwork'] = hostnetwork
-        KubernetesRequestFactory.extract_hostnetwork(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_extract_image_pull_secrets(self):
-        image_pull_secrets = 'secret_a,secret_b,secret_c'
-        pod = Pod('v3.14', {}, [], image_pull_secrets=image_pull_secrets)
-        self.expected['spec']['imagePullSecrets'] = [
-            {'name': 'secret_a'},
-            {'name': 'secret_b'},
-            {'name': 'secret_c'},
-        ]
-        KubernetesRequestFactory.extract_image_pull_secrets(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_extract_tolerations(self):
-        tolerations = [{
-            'key': 'key',
-            'operator': 'Equal',
-            'value': 'value',
-            'effect': 'NoSchedule'
-        }]
-        pod = Pod('v3.14', {}, [], tolerations=tolerations)
-        self.expected['spec']['tolerations'] = tolerations
-        KubernetesRequestFactory.extract_tolerations(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    def test_security_context(self):
-        security_context = {
-            'runAsUser': 1000,
-            'fsGroup': 2000
-        }
-        pod = Pod('v3.14', {}, [], security_context=security_context)
-        self.expected['spec']['securityContext'] = security_context
-        KubernetesRequestFactory.extract_security_context(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
-
-    @parameterized.expand([
-        'extract_resources',
-        'extract_init_containers',
-        'extract_service_account_name',
-        'extract_hostnetwork',
-        'extract_image_pull_secrets',
-        'extract_tolerations',
-        'extract_security_context',
-        'extract_volume_secrets'
-    ])
-    def test_identity(self, name):
-        kube_request_factory_func = getattr(KubernetesRequestFactory, name)
-        pod = Pod('v3.14', {}, [])
-        kube_request_factory_func(pod, self.input_req)
-        self.assertEqual(self.input_req, self.expected)
diff --git a/tests/kubernetes/kubernetes_request_factory/test_pod_request_factory.py b/tests/kubernetes/kubernetes_request_factory/test_pod_request_factory.py
deleted file mode 100644
index 68617de..0000000
--- a/tests/kubernetes/kubernetes_request_factory/test_pod_request_factory.py
+++ /dev/null
@@ -1,175 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from airflow.kubernetes.kubernetes_request_factory.pod_request_factory import SimplePodRequestFactory, \
-    ExtractXcomPodRequestFactory
-from airflow.kubernetes.pod import Pod, Resources
-from airflow.kubernetes.secret import Secret
-from airflow.exceptions import AirflowConfigException
-import unittest
-
-XCOM_CMD = 'trap "exit 0" INT; while true; do sleep 30; done;'
-
-
-class TestPodRequestFactory(unittest.TestCase):
-
-    def setUp(self):
-        self.simple_pod_request_factory = SimplePodRequestFactory()
-        self.xcom_pod_request_factory = ExtractXcomPodRequestFactory()
-        self.pod = Pod(
-            image='busybox',
-            envs={
-                'ENVIRONMENT': 'prod',
-                'LOG_LEVEL': 'warning'
-            },
-            name='myapp-pod',
-            cmds=['sh', '-c', 'echo Hello Kubernetes!'],
-            labels={'app': 'myapp'},
-            image_pull_secrets='pull_secret_a,pull_secret_b',
-            configmaps=['configmap_a', 'configmap_b'],
-            ports=[{'name': 'foo', 'containerPort': 1234}],
-            resources=Resources('1Gi', 1, '2Gi', 2, 1),
-            secrets=[
-                # This should be a secretRef
-                Secret('env', None, 'secret_a'),
-                # This should be a single secret mounted in volumeMounts
-                Secret('volume', '/etc/foo', 'secret_b'),
-                # This should produce a single secret mounted in env
-                Secret('env', 'TARGET', 'secret_b', 'source_b'),
-            ],
-            security_context={
-                'runAsUser': 1000,
-                'fsGroup': 2000,
-            }
-        )
-        self.maxDiff = None
-        self.expected = {
-            'apiVersion': 'v1',
-            'kind': 'Pod',
-            'metadata': {
-                'name': 'myapp-pod',
-                'labels': {'app': 'myapp'},
-                'annotations': {}},
-            'spec': {
-                'containers': [{
-                    'name': 'base',
-                    'image': 'busybox',
-                    'command': [
-                        'sh', '-c', 'echo Hello Kubernetes!'
-                    ],
-                    'imagePullPolicy': 'IfNotPresent',
-                    'args': [],
-                    'env': [{
-                        'name': 'ENVIRONMENT',
-                        'value': 'prod'
-                    }, {
-                        'name': 'LOG_LEVEL',
-                        'value': 'warning'
-                    }, {
-                        'name': 'TARGET',
-                        'valueFrom': {
-                            'secretKeyRef': {
-                                'name': 'secret_b',
-                                'key': 'source_b'
-                            }
-                        }
-                    }],
-                    'envFrom': [{
-                        'secretRef': {
-                            'name': 'secret_a'
-                        }
-                    }, {
-                        'configMapRef': {
-                            'name': 'configmap_a'
-                        }
-                    }, {
-                        'configMapRef': {
-                            'name': 'configmap_b'
-                        }
-                    }],
-                    'resources': {
-                        'requests': {
-                            'memory': '1Gi',
-                            'cpu': 1
-                        },
-                        'limits': {
-                            'memory': '2Gi',
-                            'cpu': 2,
-                            'nvidia.com/gpu': 1
-                        },
-                    },
-                    'ports': [{'name': 'foo', 'containerPort': 1234}],
-                    'volumeMounts': [{
-                        'mountPath': '/etc/foo',
-                        'name': 'secretvol0',
-                        'readOnly': True
-                    }]
-                }],
-                'restartPolicy': 'Never',
-                'nodeSelector': {},
-                'volumes': [{
-                    'name': 'secretvol0',
-                    'secret': {
-                        'secretName': 'secret_b'
-                    }
-                }],
-                'imagePullSecrets': [
-                    {'name': 'pull_secret_a'},
-                    {'name': 'pull_secret_b'}
-                ],
-                'affinity': {},
-                'securityContext': {
-                    'runAsUser': 1000,
-                    'fsGroup': 2000,
-                },
-            }
-        }
-
-    def test_secret_throws(self):
-        with self.assertRaises(AirflowConfigException):
-            Secret('volume', None, 'secret_a', 'key')
-
-    def test_simple_pod_request_factory_create(self):
-        result = self.simple_pod_request_factory.create(self.pod)
-        # sort
-        result['spec']['containers'][0]['env'].sort(key=lambda x: x['name'])
-        self.assertEqual(result, self.expected)
-
-    def test_xcom_pod_request_factory_create(self):
-        result = self.xcom_pod_request_factory.create(self.pod)
-        container_two = {
-            'name': 'airflow-xcom-sidecar',
-            'image': 'alpine',
-            'command': ['sh', '-c', XCOM_CMD],
-            'volumeMounts': [
-                {
-                    'name': 'xcom',
-                    'mountPath': '/airflow/xcom'
-                }
-            ],
-            'resources': {'requests': {'cpu': '1m'}},
-        }
-        self.expected['spec']['containers'].append(container_two)
-        self.expected['spec']['containers'][0]['volumeMounts'].insert(0, {
-            'name': 'xcom',
-            'mountPath': '/airflow/xcom'
-        })
-        self.expected['spec']['volumes'].insert(0, {
-            'name': 'xcom', 'emptyDir': {}
-        })
-        result['spec']['containers'][0]['env'].sort(key=lambda x: x['name'])
-        self.assertEqual(result, self.expected)
diff --git a/tests/kubernetes/kubernetes_request_factory/__init__.py b/tests/kubernetes/models/__init__.py
similarity index 100%
rename from tests/kubernetes/kubernetes_request_factory/__init__.py
rename to tests/kubernetes/models/__init__.py
diff --git a/tests/kubernetes/models/test_pod.py b/tests/kubernetes/models/test_pod.py
new file mode 100644
index 0000000..edcd364
--- /dev/null
+++ b/tests/kubernetes/models/test_pod.py
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+from tests.compat import mock
+from kubernetes.client import ApiClient
+import kubernetes.client.models as k8s
+from airflow.kubernetes.pod import Port
+from airflow.kubernetes.pod_generator import PodGenerator
+from airflow.kubernetes.k8s_model import append_to_pod
+
+
+class TestPod(unittest.TestCase):
+
+    def test_port_to_k8s_client_obj(self):
+        port = Port('http', 80)
+        self.assertEqual(
+            port.to_k8s_client_obj(),
+            k8s.V1ContainerPort(
+                name='http',
+                container_port=80
+            )
+        )
+
+    @mock.patch('uuid.uuid4')
+    def test_port_attach_to_pod(self, mock_uuid):
+        mock_uuid.return_value = '0'
+        pod = PodGenerator(image='airflow-worker:latest', name='base').gen_pod()
+        ports = [
+            Port('https', 443),
+            Port('http', 80)
+        ]
+        k8s_client = ApiClient()
+        result = append_to_pod(pod, ports)
+        result = k8s_client.sanitize_for_serialization(result)
+        self.assertEqual({
+            'apiVersion': 'v1',
+            'kind': 'Pod',
+            'metadata': {'name': 'base-0'},
+            'spec': {
+                'containers': [{
+                    'args': [],
+                    'command': [],
+                    'env': [],
+                    'envFrom': [],
+                    'image': 'airflow-worker:latest',
+                    'imagePullPolicy': 'IfNotPresent',
+                    'name': 'base',
+                    'ports': [{
+                        'name': 'https',
+                        'containerPort': 443
+                    }, {
+                        'name': 'http',
+                        'containerPort': 80
+                    }],
+                    'volumeMounts': [],
+                }],
+                'hostNetwork': False,
+                'imagePullSecrets': [],
+                'restartPolicy': 'Never',
+                'volumes': []
+            }
+        }, result)
diff --git a/tests/kubernetes/models/test_secret.py b/tests/kubernetes/models/test_secret.py
new file mode 100644
index 0000000..843bd79
--- /dev/null
+++ b/tests/kubernetes/models/test_secret.py
@@ -0,0 +1,115 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+from tests.compat import mock
+from kubernetes.client import ApiClient
+import kubernetes.client.models as k8s
+from airflow.kubernetes.secret import Secret
+from airflow.kubernetes.pod_generator import PodGenerator
+from airflow.kubernetes.k8s_model import append_to_pod
+
+
+class TestSecret(unittest.TestCase):
+
+    def test_to_env_secret(self):
+        secret = Secret('env', 'name', 'secret', 'key')
+        self.assertEqual(secret.to_env_secret(), k8s.V1EnvVar(
+            name='NAME',
+            value_from=k8s.V1EnvVarSource(
+                secret_key_ref=k8s.V1SecretKeySelector(
+                    name='secret',
+                    key='key'
+                )
+            )
+        ))
+
+    def test_to_env_from_secret(self):
+        secret = Secret('env', None, 'secret')
+        self.assertEqual(secret.to_env_from_secret(), k8s.V1EnvFromSource(
+            secret_ref=k8s.V1SecretEnvSource(name='secret')
+        ))
+
+    @mock.patch('uuid.uuid4')
+    def test_to_volume_secret(self, mock_uuid):
+        mock_uuid.return_value = '0'
+        secret = Secret('volume', '/etc/foo', 'secret_b')
+        self.assertEqual(secret.to_volume_secret(), (
+            k8s.V1Volume(
+                name='secretvol0',
+                secret=k8s.V1SecretVolumeSource(
+                    secret_name='secret_b'
+                )
+            ),
+            k8s.V1VolumeMount(
+                mount_path='/etc/foo',
+                name='secretvol0',
+                read_only=True
+            )
+        ))
+
+    @mock.patch('uuid.uuid4')
+    def test_attach_to_pod(self, mock_uuid):
+        mock_uuid.return_value = '0'
+        pod = PodGenerator(image='airflow-worker:latest',
+                           name='base').gen_pod()
+        secrets = [
+            # This should be a secretRef
+            Secret('env', None, 'secret_a'),
+            # This should be a single secret mounted in volumeMounts
+            Secret('volume', '/etc/foo', 'secret_b'),
+            # This should produce a single secret mounted in env
+            Secret('env', 'TARGET', 'secret_b', 'source_b'),
+        ]
+        k8s_client = ApiClient()
+        result = append_to_pod(pod, secrets)
+        result = k8s_client.sanitize_for_serialization(result)
+        self.assertEqual(result, {
+            'apiVersion': 'v1',
+            'kind': 'Pod',
+            'metadata': {'name': 'base-0'},
+            'spec': {
+                'containers': [{
+                    'args': [],
+                    'command': [],
+                    'env': [{
+                        'name': 'TARGET',
+                        'valueFrom': {
+                            'secretKeyRef': {
+                                'key': 'source_b',
+                                'name': 'secret_b'
+                            }
+                        }
+                    }],
+                    'envFrom': [{'secretRef': {'name': 'secret_a'}}],
+                    'image': 'airflow-worker:latest',
+                    'imagePullPolicy': 'IfNotPresent',
+                    'name': 'base',
+                    'ports': [],
+                    'volumeMounts': [{
+                        'mountPath': '/etc/foo',
+                        'name': 'secretvol0',
+                        'readOnly': True}]
+                }],
+                'hostNetwork': False,
+                'imagePullSecrets': [],
+                'restartPolicy': 'Never',
+                'volumes': [{
+                    'name': 'secretvol0',
+                    'secret': {'secretName': 'secret_b'}
+                }]
+            }
+        })
diff --git a/tests/kubernetes/test_pod_generator.py b/tests/kubernetes/test_pod_generator.py
new file mode 100644
index 0000000..00ad2bd
--- /dev/null
+++ b/tests/kubernetes/test_pod_generator.py
@@ -0,0 +1,328 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from tests.compat import mock
+import kubernetes.client.models as k8s
+from kubernetes.client import ApiClient
+from airflow.kubernetes.secret import Secret
+from airflow.kubernetes.pod_generator import PodGenerator, PodDefaults
+from airflow.kubernetes.pod import Resources
+from airflow.kubernetes.k8s_model import append_to_pod
+
+
+class TestPodGenerator(unittest.TestCase):
+
+    def setUp(self):
+        self.envs = {
+            'ENVIRONMENT': 'prod',
+            'LOG_LEVEL': 'warning'
+        }
+        self.secrets = [
+            # This should be a secretRef
+            Secret('env', None, 'secret_a'),
+            # This should be a single secret mounted in volumeMounts
+            Secret('volume', '/etc/foo', 'secret_b'),
+            # This should produce a single secret mounted in env
+            Secret('env', 'TARGET', 'secret_b', 'source_b'),
+        ]
+        self.resources = Resources('1Gi', 1, '2Gi', 2, 1)
+        self.k8s_client = ApiClient()
+        self.expected = {
+            'apiVersion': 'v1',
+            'kind': 'Pod',
+            'metadata': {
+                'name': 'myapp-pod-0',
+                'labels': {'app': 'myapp'},
+                'namespace': 'default'
+            },
+            'spec': {
+                'containers': [{
+                    'name': 'base',
+                    'image': 'busybox',
+                    'args': [],
+                    'command': [
+                        'sh', '-c', 'echo Hello Kubernetes!'
+                    ],
+                    'imagePullPolicy': 'IfNotPresent',
+                    'env': [{
+                        'name': 'ENVIRONMENT',
+                        'value': 'prod'
+                    }, {
+                        'name': 'LOG_LEVEL',
+                        'value': 'warning'
+                    }, {
+                        'name': 'TARGET',
+                        'valueFrom': {
+                            'secretKeyRef': {
+                                'name': 'secret_b',
+                                'key': 'source_b'
+                            }
+                        }
+                    }],
+                    'envFrom': [{
+                        'configMapRef': {
+                            'name': 'configmap_a'
+                        }
+                    }, {
+                        'configMapRef': {
+                            'name': 'configmap_b'
+                        }
+                    }, {
+                        'secretRef': {
+                            'name': 'secret_a'
+                        }
+                    }],
+                    'resources': {
+                        'requests': {
+                            'memory': '1Gi',
+                            'cpu': 1
+                        },
+                        'limits': {
+                            'memory': '2Gi',
+                            'cpu': 2,
+                            'nvidia.com/gpu': 1
+                        },
+                    },
+                    'ports': [{'name': 'foo', 'containerPort': 1234}],
+                    'volumeMounts': [{
+                        'mountPath': '/etc/foo',
+                        'name': 'secretvol0',
+                        'readOnly': True
+                    }]
+                }],
+                'restartPolicy': 'Never',
+                'volumes': [{
+                    'name': 'secretvol0',
+                    'secret': {
+                        'secretName': 'secret_b'
+                    }
+                }],
+                'hostNetwork': False,
+                'imagePullSecrets': [
+                    {'name': 'pull_secret_a'},
+                    {'name': 'pull_secret_b'}
+                ],
+                'securityContext': {
+                    'runAsUser': 1000,
+                    'fsGroup': 2000,
+                },
+            }
+        }
+
+    @mock.patch('uuid.uuid4')
+    def test_gen_pod(self, mock_uuid):
+        mock_uuid.return_value = '0'
+        pod_generator = PodGenerator(
+            labels={'app': 'myapp'},
+            name='myapp-pod',
+            image_pull_secrets='pull_secret_a,pull_secret_b',
+            image='busybox',
+            envs=self.envs,
+            cmds=['sh', '-c', 'echo Hello Kubernetes!'],
+            security_context=k8s.V1PodSecurityContext(
+                run_as_user=1000,
+                fs_group=2000,
+            ),
+            namespace='default',
+            ports=[k8s.V1ContainerPort(name='foo', container_port=1234)],
+            configmaps=['configmap_a', 'configmap_b']
+        )
+        result = pod_generator.gen_pod()
+        result = append_to_pod(result, self.secrets)
+        result = self.resources.attach_to_pod(result)
+        result_dict = self.k8s_client.sanitize_for_serialization(result)
+        # sort
+        result_dict['spec']['containers'][0]['env'].sort(key=lambda x: x['name'])
+        result_dict['spec']['containers'][0]['envFrom'].sort(
+            key=lambda x: list(x.values())[0]['name']
+        )
+        self.assertDictEqual(result_dict, self.expected)
+
+    @mock.patch('uuid.uuid4')
+    def test_gen_pod_extract_xcom(self, mock_uuid):
+        mock_uuid.return_value = '0'
+        pod_generator = PodGenerator(
+            labels={'app': 'myapp'},
+            name='myapp-pod',
+            image_pull_secrets='pull_secret_a,pull_secret_b',
+            image='busybox',
+            envs=self.envs,
+            cmds=['sh', '-c', 'echo Hello Kubernetes!'],
+            namespace='default',
+            security_context=k8s.V1PodSecurityContext(
+                run_as_user=1000,
+                fs_group=2000,
+            ),
+            ports=[k8s.V1ContainerPort(name='foo', container_port=1234)],
+            configmaps=['configmap_a', 'configmap_b']
+        )
+        pod_generator.extract_xcom = True
+        result = pod_generator.gen_pod()
+        result = append_to_pod(result, self.secrets)
+        result = self.resources.attach_to_pod(result)
+        result_dict = self.k8s_client.sanitize_for_serialization(result)
+        container_two = {
+            'name': 'airflow-xcom-sidecar',
+            'image': 'python:3.5-alpine',
+            'command': ['python', '-c', PodDefaults.XCOM_CMD],
+            'volumeMounts': [
+                {
+                    'name': 'xcom',
+                    'mountPath': '/airflow/xcom'
+                }
+            ]
+        }
+        self.expected['spec']['containers'].append(container_two)
+        self.expected['spec']['containers'][0]['volumeMounts'].insert(0, {
+            'name': 'xcom',
+            'mountPath': '/airflow/xcom'
+        })
+        self.expected['spec']['volumes'].insert(0, {
+            'name': 'xcom', 'emptyDir': {}
+        })
+        result_dict['spec']['containers'][0]['env'].sort(key=lambda x: x['name'])
+        self.assertEqual(result_dict, self.expected)
+
+    @mock.patch('uuid.uuid4')
+    def test_from_obj(self, mock_uuid):
+        mock_uuid.return_value = '0'
+        result = PodGenerator.from_obj({
+            "KubernetesExecutor": {
+                "annotations": {"test": "annotation"},
+                "volumes": [
+                    {
+                        "name": "example-kubernetes-test-volume",
+                        "hostPath": {"path": "/tmp/"},
+                    },
+                ],
+                "volume_mounts": [
+                    {
+                        "mountPath": "/foo/",
+                        "name": "example-kubernetes-test-volume",
+                    },
+                ],
+                "securityContext": {
+                    "runAsUser": 1000
+                }
+            }
+        })
+        result = self.k8s_client.sanitize_for_serialization(result)
+
+        self.assertEqual({
+            'apiVersion': 'v1',
+            'kind': 'Pod',
+            'metadata': {
+                'annotations': {'test': 'annotation'},
+            },
+            'spec': {
+                'containers': [{
+                    'args': [],
+                    'command': [],
+                    'env': [],
+                    'envFrom': [],
+                    'name': 'base',
+                    'ports': [],
+                    'volumeMounts': [{
+                        'mountPath': '/foo/',
+                        'name': 'example-kubernetes-test-volume'
+                    }],
+                }],
+                'imagePullSecrets': [],
+                'volumes': [{
+                    'hostPath': {'path': '/tmp/'},
+                    'name': 'example-kubernetes-test-volume'
+                }],
+            }
+        }, result)
+
+    def test_reconcile_pods(self):
+        with mock.patch('uuid.uuid4') as mock_uuid:
+            mock_uuid.return_value = '0'
+            base_pod = PodGenerator(
+                image='image1',
+                name='name1',
+                envs={'key1': 'val1'},
+                cmds=['/bin/command1.sh', 'arg1'],
+                ports=k8s.V1ContainerPort(name='port', container_port=2118),
+                volumes=[{
+                    'hostPath': {'path': '/tmp/'},
+                    'name': 'example-kubernetes-test-volume1'
+                }],
+                volume_mounts=[{
+                    'mountPath': '/foo/',
+                    'name': 'example-kubernetes-test-volume1'
+                }],
+            ).gen_pod()
+
+            mutator_pod = PodGenerator(
+                envs={'key2': 'val2'},
+                image='',
+                name='name2',
+                cmds=['/bin/command2.sh', 'arg2'],
+                volumes=[{
+                    'hostPath': {'path': '/tmp/'},
+                    'name': 'example-kubernetes-test-volume2'
+                }],
+                volume_mounts=[{
+                    'mountPath': '/foo/',
+                    'name': 'example-kubernetes-test-volume2'
+                }]
+            ).gen_pod()
+
+            result = PodGenerator.reconcile_pods(base_pod, mutator_pod)
+            result = self.k8s_client.sanitize_for_serialization(result)
+            self.assertEqual(result, {
+                'apiVersion': 'v1',
+                'kind': 'Pod',
+                'metadata': {'name': 'name2-0'},
+                'spec': {
+                    'containers': [{
+                        'args': [],
+                        'command': ['/bin/command1.sh', 'arg1'],
+                        'env': [
+                            {'name': 'key1', 'value': 'val1'},
+                            {'name': 'key2', 'value': 'val2'}
+                        ],
+                        'envFrom': [],
+                        'image': 'image1',
+                        'imagePullPolicy': 'IfNotPresent',
+                        'name': 'base',
+                        'ports': {
+                            'containerPort': 2118,
+                            'name': 'port',
+                        },
+                        'volumeMounts': [{
+                            'mountPath': '/foo/',
+                            'name': 'example-kubernetes-test-volume1'
+                        }, {
+                            'mountPath': '/foo/',
+                            'name': 'example-kubernetes-test-volume2'
+                        }]
+                    }],
+                    'hostNetwork': False,
+                    'imagePullSecrets': [],
+                    'restartPolicy': 'Never',
+                    'volumes': [{
+                        'hostPath': {'path': '/tmp/'},
+                        'name': 'example-kubernetes-test-volume1'
+                    }, {
+                        'hostPath': {'path': '/tmp/'},
+                        'name': 'example-kubernetes-test-volume2'
+                    }]
+                }
+            })
diff --git a/tests/kubernetes/test_pod_launcher.py b/tests/kubernetes/test_pod_launcher.py
index 422d3db..9e0c288 100644
--- a/tests/kubernetes/test_pod_launcher.py
+++ b/tests/kubernetes/test_pod_launcher.py
@@ -30,11 +30,13 @@ class TestPodLauncher(unittest.TestCase):
         self.pod_launcher = PodLauncher(kube_client=self.mock_kube_client)
 
     def test_read_pod_logs_successfully_returns_logs(self):
+        mock.sentinel.metadata = mock.MagicMock()
         self.mock_kube_client.read_namespaced_pod_log.return_value = mock.sentinel.logs
         logs = self.pod_launcher.read_pod_logs(mock.sentinel)
         self.assertEqual(mock.sentinel.logs, logs)
 
     def test_read_pod_logs_retries_successfully(self):
+        mock.sentinel.metadata = mock.MagicMock()
         self.mock_kube_client.read_namespaced_pod_log.side_effect = [
             BaseHTTPError('Boom'),
             mock.sentinel.logs
@@ -46,21 +48,22 @@ class TestPodLauncher(unittest.TestCase):
                 _preload_content=False,
                 container='base',
                 follow=True,
-                name=mock.sentinel.name,
-                namespace=mock.sentinel.namespace,
+                name=mock.sentinel.metadata.name,
+                namespace=mock.sentinel.metadata.namespace,
                 tail_lines=10
             ),
             mock.call(
                 _preload_content=False,
                 container='base',
                 follow=True,
-                name=mock.sentinel.name,
-                namespace=mock.sentinel.namespace,
+                name=mock.sentinel.metadata.name,
+                namespace=mock.sentinel.metadata.namespace,
                 tail_lines=10
             )
         ])
 
     def test_read_pod_logs_retries_fails(self):
+        mock.sentinel.metadata = mock.MagicMock()
         self.mock_kube_client.read_namespaced_pod_log.side_effect = [
             BaseHTTPError('Boom'),
             BaseHTTPError('Boom'),
@@ -73,11 +76,13 @@ class TestPodLauncher(unittest.TestCase):
         )
 
     def test_read_pod_returns_logs(self):
+        mock.sentinel.metadata = mock.MagicMock()
         self.mock_kube_client.read_namespaced_pod.return_value = mock.sentinel.pod_info
         pod_info = self.pod_launcher.read_pod(mock.sentinel)
         self.assertEqual(mock.sentinel.pod_info, pod_info)
 
     def test_read_pod_retries_successfully(self):
+        mock.sentinel.metadata = mock.MagicMock()
         self.mock_kube_client.read_namespaced_pod.side_effect = [
             BaseHTTPError('Boom'),
             mock.sentinel.pod_info
@@ -85,11 +90,12 @@ class TestPodLauncher(unittest.TestCase):
         pod_info = self.pod_launcher.read_pod(mock.sentinel)
         self.assertEqual(mock.sentinel.pod_info, pod_info)
         self.mock_kube_client.read_namespaced_pod.assert_has_calls([
-            mock.call(mock.sentinel.name, mock.sentinel.namespace),
-            mock.call(mock.sentinel.name, mock.sentinel.namespace)
+            mock.call(mock.sentinel.metadata.name, mock.sentinel.metadata.namespace),
+            mock.call(mock.sentinel.metadata.name, mock.sentinel.metadata.namespace)
         ])
 
     def test_read_pod_retries_fails(self):
+        mock.sentinel.metadata = mock.MagicMock()
         self.mock_kube_client.read_namespaced_pod.side_effect = [
             BaseHTTPError('Boom'),
             BaseHTTPError('Boom'),
diff --git a/tests/kubernetes/test_worker_configuration.py b/tests/kubernetes/test_worker_configuration.py
new file mode 100644
index 0000000..1b15d98
--- /dev/null
+++ b/tests/kubernetes/test_worker_configuration.py
@@ -0,0 +1,606 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+import uuid
+from datetime import datetime
+from tests.compat import mock
+from tests.test_utils.config import conf_vars
+try:
+    from airflow.executors.kubernetes_executor import AirflowKubernetesScheduler
+    from airflow.executors.kubernetes_executor import KubeConfig
+    from airflow.kubernetes.worker_configuration import WorkerConfiguration
+    from airflow.kubernetes.pod_generator import PodGenerator
+    from airflow.exceptions import AirflowConfigException
+    from airflow.kubernetes.secret import Secret
+    import kubernetes.client.models as k8s
+    from kubernetes.client.api_client import ApiClient
+except ImportError:
+    AirflowKubernetesScheduler = None  # type: ignore
+
+
+class TestKubernetesWorkerConfiguration(unittest.TestCase):
+    """
+    Tests that if dags_volume_subpath/logs_volume_subpath configuration
+    options are passed to worker pod config
+    """
+
+    affinity_config = {
+        'podAntiAffinity': {
+            'requiredDuringSchedulingIgnoredDuringExecution': [
+                {
+                    'topologyKey': 'kubernetes.io/hostname',
+                    'labelSelector': {
+                        'matchExpressions': [
+                            {
+                                'key': 'app',
+                                'operator': 'In',
+                                'values': ['airflow']
+                            }
+                        ]
+                    }
+                }
+            ]
+        }
+    }
+
+    tolerations_config = [
+        {
+            'key': 'dedicated',
+            'operator': 'Equal',
+            'value': 'airflow'
+        },
+        {
+            'key': 'prod',
+            'operator': 'Exists'
+        }
+    ]
+
+    def setUp(self):
+        if AirflowKubernetesScheduler is None:
+            self.skipTest("kubernetes python package is not installed")
+
+        self.kube_config = mock.MagicMock()
+        self.kube_config.airflow_home = '/'
+        self.kube_config.airflow_dags = 'dags'
+        self.kube_config.airflow_logs = 'logs'
+        self.kube_config.dags_volume_subpath = None
+        self.kube_config.logs_volume_subpath = None
+        self.kube_config.dags_in_image = False
+        self.kube_config.dags_folder = None
+        self.kube_config.git_dags_folder_mount_point = None
+        self.kube_config.kube_labels = {'dag_id': 'original_dag_id', 'my_label': 'label_id'}
+        self.api_client = ApiClient()
+
+    @conf_vars({
+        ('kubernetes', 'git_ssh_known_hosts_configmap_name'): 'airflow-configmap',
+        ('kubernetes', 'git_ssh_key_secret_name'): 'airflow-secrets',
+        ('kubernetes', 'git_user'): 'some-user',
+        ('kubernetes', 'git_password'): 'some-password',
+        ('kubernetes', 'git_repo'): 'git@github.com:apache/airflow.git',
+        ('kubernetes', 'git_branch'): 'master',
+        ('kubernetes', 'git_dags_folder_mount_point'): '/usr/local/airflow/dags',
+        ('kubernetes', 'delete_worker_pods'): 'True',
+        ('kubernetes', 'kube_client_request_args'): '{"_request_timeout" : [60,360]}',
+    })
+    def test_worker_configuration_auth_both_ssh_and_user(self):
+        with self.assertRaisesRegexp(AirflowConfigException,
+                                     'either `git_user` and `git_password`.*'
+                                     'or `git_ssh_key_secret_name`.*'
+                                     'but not both$'):
+            KubeConfig()
+
+    def test_worker_with_subpaths(self):
+        self.kube_config.dags_volume_subpath = 'dags'
+        self.kube_config.logs_volume_subpath = 'logs'
+        self.kube_config.dags_volume_claim = 'dags'
+        self.kube_config.dags_folder = 'dags'
+        worker_config = WorkerConfiguration(self.kube_config)
+        volumes = worker_config._get_volumes()
+        volume_mounts = worker_config._get_volume_mounts()
+
+        for volume in volumes:
+            self.assertNotIn(
+                'subPath', self.api_client.sanitize_for_serialization(volume),
+                "subPath isn't valid configuration for a volume"
+            )
+
+        for volume_mount in volume_mounts:
+            if volume_mount.name != 'airflow-config':
+                self.assertIn(
+                    'subPath', self.api_client.sanitize_for_serialization(volume_mount),
+                    "subPath should've been passed to volumeMount configuration"
+                )
+
+    def test_worker_generate_dag_volume_mount_path(self):
+        self.kube_config.git_dags_folder_mount_point = '/root/airflow/git/dags'
+        self.kube_config.dags_folder = '/root/airflow/dags'
+        worker_config = WorkerConfiguration(self.kube_config)
+
+        self.kube_config.dags_volume_claim = 'airflow-dags'
+        self.kube_config.dags_volume_host = ''
+        dag_volume_mount_path = worker_config.generate_dag_volume_mount_path()
+        self.assertEqual(dag_volume_mount_path, self.kube_config.dags_folder)
+
+        self.kube_config.dags_volume_claim = ''
+        self.kube_config.dags_volume_host = '/host/airflow/dags'
+        dag_volume_mount_path = worker_config.generate_dag_volume_mount_path()
+        self.assertEqual(dag_volume_mount_path, self.kube_config.dags_folder)
+
+        self.kube_config.dags_volume_claim = ''
+        self.kube_config.dags_volume_host = ''
+        dag_volume_mount_path = worker_config.generate_dag_volume_mount_path()
+        self.assertEqual(dag_volume_mount_path,
+                         self.kube_config.git_dags_folder_mount_point)
+
+    def test_worker_environment_no_dags_folder(self):
+        self.kube_config.airflow_configmap = ''
+        self.kube_config.git_dags_folder_mount_point = ''
+        self.kube_config.dags_folder = ''
+        worker_config = WorkerConfiguration(self.kube_config)
+        env = worker_config._get_environment()
+
+        self.assertNotIn('AIRFLOW__CORE__DAGS_FOLDER', env)
+
+    def test_worker_environment_when_dags_folder_specified(self):
+        self.kube_config.airflow_configmap = 'airflow-configmap'
+        self.kube_config.git_dags_folder_mount_point = ''
+        dags_folder = '/workers/path/to/dags'
+        self.kube_config.dags_folder = dags_folder
+
+        worker_config = WorkerConfiguration(self.kube_config)
+        env = worker_config._get_environment()
+
+        self.assertEqual(dags_folder, env['AIRFLOW__CORE__DAGS_FOLDER'])
+
+    def test_worker_environment_dags_folder_using_git_sync(self):
+        self.kube_config.airflow_configmap = 'airflow-configmap'
+        self.kube_config.git_sync_dest = 'repo'
+        self.kube_config.git_subpath = 'dags'
+        self.kube_config.git_dags_folder_mount_point = '/workers/path/to/dags'
+
+        dags_folder = '{}/{}/{}'.format(self.kube_config.git_dags_folder_mount_point,
+                                        self.kube_config.git_sync_dest,
+                                        self.kube_config.git_subpath)
+
+        worker_config = WorkerConfiguration(self.kube_config)
+        env = worker_config._get_environment()
+
+        self.assertEqual(dags_folder, env['AIRFLOW__CORE__DAGS_FOLDER'])
+
+    def test_init_environment_using_git_sync_ssh_without_known_hosts(self):
+        # Tests the init environment created with git-sync SSH authentication option is correct
+        # without known hosts file
+        self.kube_config.airflow_configmap = 'airflow-configmap'
+        self.kube_config.git_ssh_secret_name = 'airflow-secrets'
+        self.kube_config.git_ssh_known_hosts_configmap_name = None
+        self.kube_config.dags_volume_claim = None
+        self.kube_config.dags_volume_host = None
+        self.kube_config.dags_in_image = None
+
+        worker_config = WorkerConfiguration(self.kube_config)
+        init_containers = worker_config._get_init_containers()
+
+        self.assertTrue(init_containers)  # check not empty
+        env = init_containers[0].env
+
+        self.assertIn(k8s.V1EnvVar(name='GIT_SSH_KEY_FILE', value='/etc/git-secret/ssh'), env)
+        self.assertIn(k8s.V1EnvVar(name='GIT_KNOWN_HOSTS', value='false'), env)
+        self.assertIn(k8s.V1EnvVar(name='GIT_SYNC_SSH', value='true'), env)
+
+    def test_init_environment_using_git_sync_ssh_with_known_hosts(self):
+        # Tests the init environment created with git-sync SSH authentication option is correct
+        # with known hosts file
+        self.kube_config.airflow_configmap = 'airflow-configmap'
+        self.kube_config.git_ssh_key_secret_name = 'airflow-secrets'
+        self.kube_config.git_ssh_known_hosts_configmap_name = 'airflow-configmap'
+        self.kube_config.dags_volume_claim = None
+        self.kube_config.dags_volume_host = None
+        self.kube_config.dags_in_image = None
+
+        worker_config = WorkerConfiguration(self.kube_config)
+        init_containers = worker_config._get_init_containers()
+
+        self.assertTrue(init_containers)  # check not empty
+        env = init_containers[0].env
+
+        self.assertIn(k8s.V1EnvVar(name='GIT_SSH_KEY_FILE', value='/etc/git-secret/ssh'), env)
+        self.assertIn(k8s.V1EnvVar(name='GIT_KNOWN_HOSTS', value='true'), env)
+        self.assertIn(k8s.V1EnvVar(
+            name='GIT_SSH_KNOWN_HOSTS_FILE',
+            value='/etc/git-secret/known_hosts'
+        ), env)
+        self.assertIn(k8s.V1EnvVar(name='GIT_SYNC_SSH', value='true'), env)
+
+    def test_init_environment_using_git_sync_user_without_known_hosts(self):
+        # Tests the init environment created with git-sync User authentication option is correct
+        # without known hosts file
+        self.kube_config.airflow_configmap = 'airflow-configmap'
+        self.kube_config.git_user = 'git_user'
+        self.kube_config.git_password = 'git_password'
+        self.kube_config.git_ssh_known_hosts_configmap_name = None
+        self.kube_config.git_ssh_key_secret_name = None
+        self.kube_config.dags_volume_claim = None
+        self.kube_config.dags_volume_host = None
+        self.kube_config.dags_in_image = None
+
+        worker_config = WorkerConfiguration(self.kube_config)
+        init_containers = worker_config._get_init_containers()
+
+        self.assertTrue(init_containers)  # check not empty
+        env = init_containers[0].env
+
+        self.assertNotIn(k8s.V1EnvVar(name='GIT_SSH_KEY_FILE', value='/etc/git-secret/ssh'), env)
+        self.assertIn(k8s.V1EnvVar(name='GIT_SYNC_USERNAME', value='git_user'), env)
+        self.assertIn(k8s.V1EnvVar(name='GIT_SYNC_PASSWORD', value='git_password'), env)
+        self.assertIn(k8s.V1EnvVar(name='GIT_KNOWN_HOSTS', value='false'), env)
+        self.assertNotIn(k8s.V1EnvVar(
+            name='GIT_SSH_KNOWN_HOSTS_FILE',
+            value='/etc/git-secret/known_hosts'
+        ), env)
+        self.assertNotIn(k8s.V1EnvVar(name='GIT_SYNC_SSH', value='true'), env)
+
+    def test_init_environment_using_git_sync_user_with_known_hosts(self):
+        # Tests the init environment created with git-sync User authentication option is correct
+        # with known hosts file
+        self.kube_config.airflow_configmap = 'airflow-configmap'
+        self.kube_config.git_user = 'git_user'
+        self.kube_config.git_password = 'git_password'
+        self.kube_config.git_ssh_known_hosts_configmap_name = 'airflow-configmap'
+        self.kube_config.git_ssh_key_secret_name = None
+        self.kube_config.dags_volume_claim = None
+        self.kube_config.dags_volume_host = None
+        self.kube_config.dags_in_image = None
+
+        worker_config = WorkerConfiguration(self.kube_config)
+        init_containers = worker_config._get_init_containers()
+
+        self.assertTrue(init_containers)  # check not empty
+        env = init_containers[0].env
+
+        self.assertNotIn(k8s.V1EnvVar(name='GIT_SSH_KEY_FILE', value='/etc/git-secret/ssh'), env)
+        self.assertIn(k8s.V1EnvVar(name='GIT_SYNC_USERNAME', value='git_user'), env)
+        self.assertIn(k8s.V1EnvVar(name='GIT_SYNC_PASSWORD', value='git_password'), env)
+        self.assertIn(k8s.V1EnvVar(name='GIT_KNOWN_HOSTS', value='true'), env)
+        self.assertIn(k8s.V1EnvVar(
+            name='GIT_SSH_KNOWN_HOSTS_FILE',
+            value='/etc/git-secret/known_hosts'
+        ), env)
+        self.assertNotIn(k8s.V1EnvVar(name='GIT_SYNC_SSH', value='true'), env)
+
+    def test_init_environment_using_git_sync_run_as_user_empty(self):
+        # Tests if git_syn_run_as_user is none, then no securityContext created in init container
+
+        self.kube_config.dags_volume_claim = None
+        self.kube_config.dags_volume_host = None
+        self.kube_config.dags_in_image = None
+        self.kube_config.git_sync_run_as_user = ''
+
+        worker_config = WorkerConfiguration(self.kube_config)
+        init_containers = worker_config._get_init_containers()
+        self.assertTrue(init_containers)  # check not empty
+
+        self.assertIsNone(init_containers[0].security_context)
+
+    def test_make_pod_run_as_user_0(self):
+        # Tests the pod created with run-as-user 0 actually gets that in it's config
+        self.kube_config.worker_run_as_user = 0
+        self.kube_config.dags_volume_claim = None
+        self.kube_config.dags_volume_host = None
+        self.kube_config.dags_in_image = None
+        self.kube_config.worker_fs_group = None
+        self.kube_config.git_dags_folder_mount_point = 'dags'
+        self.kube_config.git_sync_dest = 'repo'
+        self.kube_config.git_subpath = 'path'
+
+        worker_config = WorkerConfiguration(self.kube_config)
+        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
+                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'")
+
+        self.assertEqual(0, pod.spec.security_context.run_as_user)
+
+    def test_make_pod_git_sync_ssh_without_known_hosts(self):
+        # Tests the pod created with git-sync SSH authentication option is correct without known hosts
+        self.kube_config.airflow_configmap = 'airflow-configmap'
+        self.kube_config.git_ssh_key_secret_name = 'airflow-secrets'
+        self.kube_config.dags_volume_claim = None
+        self.kube_config.dags_volume_host = None
+        self.kube_config.dags_in_image = None
+        self.kube_config.worker_fs_group = None
+        self.kube_config.git_dags_folder_mount_point = 'dags'
+        self.kube_config.git_sync_dest = 'repo'
+        self.kube_config.git_subpath = 'path'
+
+        worker_config = WorkerConfiguration(self.kube_config)
+
+        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
+                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'")
+
+        init_containers = worker_config._get_init_containers()
+        git_ssh_key_file = next((x.value for x in init_containers[0].env
+                                if x.name == 'GIT_SSH_KEY_FILE'), None)
+        volume_mount_ssh_key = next((x.mount_path for x in init_containers[0].volume_mounts
+                                    if x.name == worker_config.git_sync_ssh_secret_volume_name),
+                                    None)
+        self.assertTrue(git_ssh_key_file)
+        self.assertTrue(volume_mount_ssh_key)
+        self.assertEqual(65533, pod.spec.security_context.fs_group)
+        self.assertEqual(git_ssh_key_file,
+                         volume_mount_ssh_key,
+                         'The location where the git ssh secret is mounted'
+                         ' needs to be the same as the GIT_SSH_KEY_FILE path')
+
+    def test_make_pod_git_sync_credentials_secret(self):
+        # Tests the pod created with git_sync_credentials_secret will get into the init container
+        self.kube_config.git_sync_credentials_secret = 'airflow-git-creds-secret'
+        self.kube_config.dags_volume_claim = None
+        self.kube_config.dags_volume_host = None
+        self.kube_config.dags_in_image = None
+        self.kube_config.worker_fs_group = None
+        self.kube_config.git_dags_folder_mount_point = 'dags'
+        self.kube_config.git_sync_dest = 'repo'
+        self.kube_config.git_subpath = 'path'
+
+        worker_config = WorkerConfiguration(self.kube_config)
+
+        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
+                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'")
+
+        username_env = k8s.V1EnvVar(
+            name='GIT_SYNC_USERNAME',
+            value_from=k8s.V1EnvVarSource(
+                secret_key_ref=k8s.V1SecretKeySelector(
+                    name=self.kube_config.git_sync_credentials_secret,
+                    key='GIT_SYNC_USERNAME')
+            )
+        )
+        password_env = k8s.V1EnvVar(
+            name='GIT_SYNC_PASSWORD',
+            value_from=k8s.V1EnvVarSource(
+                secret_key_ref=k8s.V1SecretKeySelector(
+                    name=self.kube_config.git_sync_credentials_secret,
+                    key='GIT_SYNC_PASSWORD')
+            )
+        )
+
+        self.assertIn(username_env, pod.spec.init_containers[0].env,
+                      'The username env for git credentials did not get into the init container')
+
+        self.assertIn(password_env, pod.spec.init_containers[0].env,
+                      'The password env for git credentials did not get into the init container')
+
+    def test_make_pod_git_sync_ssh_with_known_hosts(self):
+        # Tests the pod created with git-sync SSH authentication option is correct with known hosts
+        self.kube_config.airflow_configmap = 'airflow-configmap'
+        self.kube_config.git_ssh_secret_name = 'airflow-secrets'
+        self.kube_config.dags_volume_claim = None
+        self.kube_config.dags_volume_host = None
+        self.kube_config.dags_in_image = None
+
+        worker_config = WorkerConfiguration(self.kube_config)
+
+        init_containers = worker_config._get_init_containers()
+        git_ssh_known_hosts_file = next((x.value for x in init_containers[0].env
+                                         if x.name == 'GIT_SSH_KNOWN_HOSTS_FILE'), None)
+
+        volume_mount_ssh_known_hosts_file = next(
+            (x.mount_path for x in init_containers[0].volume_mounts
+             if x.name == worker_config.git_sync_ssh_known_hosts_volume_name),
+            None)
+        self.assertTrue(git_ssh_known_hosts_file)
+        self.assertTrue(volume_mount_ssh_known_hosts_file)
+        self.assertEqual(git_ssh_known_hosts_file,
+                         volume_mount_ssh_known_hosts_file,
+                         'The location where the git known hosts file is mounted'
+                         ' needs to be the same as the GIT_SSH_KNOWN_HOSTS_FILE path')
+
+    def test_make_pod_with_empty_executor_config(self):
+        self.kube_config.kube_affinity = self.affinity_config
+        self.kube_config.kube_tolerations = self.tolerations_config
+        self.kube_config.dags_folder = 'dags'
+        worker_config = WorkerConfiguration(self.kube_config)
+
+        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
+                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'")
+
+        self.assertTrue(pod.spec.affinity['podAntiAffinity'] is not None)
+        self.assertEqual('app',
+                         pod.spec.affinity['podAntiAffinity']
+                         ['requiredDuringSchedulingIgnoredDuringExecution'][0]
+                         ['labelSelector']
+                         ['matchExpressions'][0]
+                         ['key'])
+
+        self.assertEqual(2, len(pod.spec.tolerations))
+        self.assertEqual('prod', pod.spec.tolerations[1]['key'])
+
+    def test_make_pod_with_executor_config(self):
+        self.kube_config.dags_folder = 'dags'
+        worker_config = WorkerConfiguration(self.kube_config)
+        config_pod = PodGenerator(
+            image='',
+            affinity=self.affinity_config,
+            tolerations=self.tolerations_config,
+        ).gen_pod()
+
+        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
+                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'")
+
+        result = PodGenerator.reconcile_pods(pod, config_pod)
+
+        self.assertTrue(result.spec.affinity['podAntiAffinity'] is not None)
+        self.assertEqual('app',
+                         result.spec.affinity['podAntiAffinity']
+                         ['requiredDuringSchedulingIgnoredDuringExecution'][0]
+                         ['labelSelector']
+                         ['matchExpressions'][0]
+                         ['key'])
+
+        self.assertEqual(2, len(result.spec.tolerations))
+        self.assertEqual('prod', result.spec.tolerations[1]['key'])
+
+    def test_worker_pvc_dags(self):
+        # Tests persistence volume config created when `dags_volume_claim` is set
+        self.kube_config.dags_volume_claim = 'airflow-dags'
+        self.kube_config.dags_folder = 'dags'
+        worker_config = WorkerConfiguration(self.kube_config)
+        volumes = worker_config._get_volumes()
+        volume_mounts = worker_config._get_volume_mounts()
+
+        init_containers = worker_config._get_init_containers()
+
+        dag_volume = [volume for volume in volumes if volume.name == 'airflow-dags']
+        dag_volume_mount = [mount for mount in volume_mounts if mount.name == 'airflow-dags']
+
+        self.assertEqual('airflow-dags', dag_volume[0].persistent_volume_claim.claim_name)
+        self.assertEqual(1, len(dag_volume_mount))
+        self.assertTrue(dag_volume_mount[0].read_only)
+        self.assertEqual(0, len(init_containers))
+
+    def test_worker_git_dags(self):
+        # Tests persistence volume config created when `git_repo` is set
+        self.kube_config.dags_volume_claim = None
+        self.kube_config.dags_volume_host = None
+        self.kube_config.dags_folder = '/usr/local/airflow/dags'
+        self.kube_config.worker_dags_folder = '/usr/local/airflow/dags'
+
+        self.kube_config.git_sync_container_repository = 'gcr.io/google-containers/git-sync-amd64'
+        self.kube_config.git_sync_container_tag = 'v2.0.5'
+        self.kube_config.git_sync_container = 'gcr.io/google-containers/git-sync-amd64:v2.0.5'
+        self.kube_config.git_sync_init_container_name = 'git-sync-clone'
+        self.kube_config.git_subpath = 'dags_folder'
+        self.kube_config.git_sync_root = '/git'
+        self.kube_config.git_sync_run_as_user = 65533
+        self.kube_config.git_dags_folder_mount_point = '/usr/local/airflow/dags/repo/dags_folder'
+
+        worker_config = WorkerConfiguration(self.kube_config)
+        volumes = worker_config._get_volumes()
+        volume_mounts = worker_config._get_volume_mounts()
+
+        dag_volume = [volume for volume in volumes if volume.name == 'airflow-dags']
+        dag_volume_mount = [mount for mount in volume_mounts if mount.name == 'airflow-dags']
+
+        self.assertIsNotNone(dag_volume[0].empty_dir)
+        self.assertEqual(self.kube_config.git_dags_folder_mount_point, dag_volume_mount[0].mount_path)
+        self.assertTrue(dag_volume_mount[0].read_only)
+
+        init_container = worker_config._get_init_containers()[0]
+        init_container_volume_mount = [mount for mount in init_container.volume_mounts
+                                       if mount.name == 'airflow-dags']
+
+        self.assertEqual('git-sync-clone', init_container.name)
+        self.assertEqual('gcr.io/google-containers/git-sync-amd64:v2.0.5', init_container.image)
+        self.assertEqual(1, len(init_container_volume_mount))
+        self.assertFalse(init_container_volume_mount[0].read_only)
+        self.assertEqual(65533, init_container.security_context.run_as_user)
+
+    def test_worker_container_dags(self):
+        # Tests that the 'airflow-dags' persistence volume is NOT created when `dags_in_image` is set
+        self.kube_config.dags_in_image = True
+        self.kube_config.dags_folder = 'dags'
+        worker_config = WorkerConfiguration(self.kube_config)
+        volumes = worker_config._get_volumes()
+        volume_mounts = worker_config._get_volume_mounts()
+
+        dag_volume = [volume for volume in volumes if volume.name == 'airflow-dags']
+        dag_volume_mount = [mount for mount in volume_mounts if mount.name == 'airflow-dags']
+
+        init_containers = worker_config._get_init_containers()
+
+        self.assertEqual(0, len(dag_volume))
+        self.assertEqual(0, len(dag_volume_mount))
+        self.assertEqual(0, len(init_containers))
+
+    def test_kubernetes_environment_variables(self):
+        # Tests the kubernetes environment variables get copied into the worker pods
+        input_environment = {
+            'ENVIRONMENT': 'prod',
+            'LOG_LEVEL': 'warning'
+        }
+        self.kube_config.kube_env_vars = input_environment
+        worker_config = WorkerConfiguration(self.kube_config)
+        env = worker_config._get_environment()
+        for key in input_environment:
+            self.assertIn(key, env)
+            self.assertIn(input_environment[key], env.values())
+
+        core_executor = 'AIRFLOW__CORE__EXECUTOR'
+        input_environment = {
+            core_executor: 'NotLocalExecutor'
+        }
+        self.kube_config.kube_env_vars = input_environment
+        worker_config = WorkerConfiguration(self.kube_config)
+        env = worker_config._get_environment()
+        self.assertEqual(env[core_executor], 'LocalExecutor')
+
+    def test_get_secrets(self):
+        # Test when secretRef is None and kube_secrets is not empty
+        self.kube_config.kube_secrets = {
+            'AWS_SECRET_KEY': 'airflow-secret=aws_secret_key',
+            'POSTGRES_PASSWORD': 'airflow-secret=postgres_credentials'
+        }
+        self.kube_config.env_from_secret_ref = None
+        worker_config = WorkerConfiguration(self.kube_config)
+        secrets = worker_config._get_secrets()
+        secrets.sort(key=lambda secret: secret.deploy_target)
+        expected = [
+            Secret('env', 'AWS_SECRET_KEY', 'airflow-secret', 'aws_secret_key'),
+            Secret('env', 'POSTGRES_PASSWORD', 'airflow-secret', 'postgres_credentials')
+        ]
+        self.assertListEqual(expected, secrets)
+
+        # Test when secret is not empty and kube_secrets is empty dict
+        self.kube_config.kube_secrets = {}
+        self.kube_config.env_from_secret_ref = 'secret_a,secret_b'
+        worker_config = WorkerConfiguration(self.kube_config)
+        secrets = worker_config._get_secrets()
+        expected = [
+            Secret('env', None, 'secret_a'),
+            Secret('env', None, 'secret_b')
+        ]
+        self.assertListEqual(expected, secrets)
+
+    def test_get_env_from(self):
+        # Test when configmap is empty
+        self.kube_config.env_from_configmap_ref = ''
+        worker_config = WorkerConfiguration(self.kube_config)
+        configmaps = worker_config._get_env_from()
+        self.assertListEqual([], configmaps)
+
+        # test when configmap is not empty
+        self.kube_config.env_from_configmap_ref = 'configmap_a,configmap_b'
+        self.kube_config.env_from_secret_ref = 'secretref_a,secretref_b'
+        worker_config = WorkerConfiguration(self.kube_config)
+        configmaps = worker_config._get_env_from()
+        self.assertListEqual([
+            k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name='configmap_a')),
+            k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name='configmap_b')),
+            k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(name='secretref_a')),
+            k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(name='secretref_b'))
+        ], configmaps)
+
+    def test_get_labels(self):
+        worker_config = WorkerConfiguration(self.kube_config)
+        labels = worker_config._get_labels({'my_kube_executor_label': 'kubernetes'}, {
+            'dag_id': 'override_dag_id',
+        })
+        self.assertEqual({
+            'my_label': 'label_id',
+            'dag_id': 'override_dag_id',
+            'my_kube_executor_label': 'kubernetes'
+        }, labels)
diff --git a/tests/runtime/kubernetes/test_kubernetes_pod_operator.py b/tests/runtime/kubernetes/test_kubernetes_pod_operator.py
index 96e06fe..c9bc741 100644
--- a/tests/runtime/kubernetes/test_kubernetes_pod_operator.py
+++ b/tests/runtime/kubernetes/test_kubernetes_pod_operator.py
@@ -19,20 +19,22 @@ import json
 import os
 import shutil
 import unittest
-from tests.compat import mock
 
+import kubernetes.client.models as k8s
 import pytest
 from kubernetes.client.api_client import ApiClient
 from kubernetes.client.rest import ApiException
 
+from airflow.kubernetes.volume import Volume
 from airflow import AirflowException
 from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
 from airflow.kubernetes.pod import Port
+from airflow.kubernetes.pod_generator import PodDefaults
 from airflow.kubernetes.pod_launcher import PodLauncher
 from airflow.kubernetes.secret import Secret
-from airflow.kubernetes.volume import Volume
 from airflow.kubernetes.volume_mount import VolumeMount
 from airflow.version import version as airflow_version
+from tests.compat import mock
 
 
 @pytest.mark.runtime("kubernetes")
@@ -49,7 +51,8 @@ class TestKubernetesPodOperator(unittest.TestCase):
                 'name': mock.ANY,
                 'annotations': {},
                 'labels': {
-                    'foo': 'bar', 'kubernetes_pod_operator': 'True',
+                    'foo': 'bar',
+                    'kubernetes_pod_operator': 'True',
                     'airflow_version': airflow_version.replace('+', '-')
                 }
             },
@@ -77,30 +80,10 @@ class TestKubernetesPodOperator(unittest.TestCase):
             }
         }
 
-    def test_do_xcom_push_defaults_false(self):
-        new_config_path = '/tmp/kube_config'
-        old_config_path = os.path.expanduser('~/.kube/config')
-        shutil.copy(old_config_path, new_config_path)
-
-        k = KubernetesPodOperator(
-            namespace='default',
-            image="ubuntu:16.04",
-            cmds=["bash", "-cx"],
-            arguments=["echo 10"],
-            labels={"foo": "bar"},
-            name="test",
-            task_id="task",
-            in_cluster=False,
-            do_xcom_push=False,
-            config_file=new_config_path,
-        )
-        self.assertFalse(k.do_xcom_push)
-
     def test_config_path_move(self):
         new_config_path = '/tmp/kube_config'
         old_config_path = os.path.expanduser('~/.kube/config')
         shutil.copy(old_config_path, new_config_path)
-
         k = KubernetesPodOperator(
             namespace='default',
             image="ubuntu:16.04",
@@ -114,6 +97,8 @@ class TestKubernetesPodOperator(unittest.TestCase):
             config_file=new_config_path,
         )
         k.execute(None)
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        self.assertEqual(self.expected_pod, actual_pod)
 
     @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
     @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
@@ -163,8 +148,10 @@ class TestKubernetesPodOperator(unittest.TestCase):
         )
         mock_launcher.return_value = (State.SUCCESS, None)
         k.execute(None)
-        self.assertEqual(mock_launcher.call_args[0][0].image_pull_secrets,
-                         fake_pull_secrets)
+        self.assertEqual(
+            mock_launcher.call_args[0][0].spec.image_pull_secrets,
+            [k8s.V1LocalObjectReference(name=fake_pull_secrets)]
+        )
 
     @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
     @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.delete_pod")
@@ -201,6 +188,8 @@ class TestKubernetesPodOperator(unittest.TestCase):
             do_xcom_push=False,
         )
         k.execute(None)
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        self.assertEqual(self.expected_pod, actual_pod)
 
     def test_delete_operator_pod(self):
         k = KubernetesPodOperator(
@@ -216,6 +205,8 @@ class TestKubernetesPodOperator(unittest.TestCase):
             do_xcom_push=False,
         )
         k.execute(None)
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        self.assertEqual(self.expected_pod, actual_pod)
 
     def test_pod_hostnetwork(self):
         k = KubernetesPodOperator(
@@ -231,6 +222,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
             hostnetwork=True,
         )
         k.execute(None)
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        self.expected_pod['spec']['hostNetwork'] = True
+        self.assertEqual(self.expected_pod, actual_pod)
 
     def test_pod_dnspolicy(self):
         dns_policy = "ClusterFirstWithHostNet"
@@ -248,6 +242,10 @@ class TestKubernetesPodOperator(unittest.TestCase):
             dnspolicy=dns_policy,
         )
         k.execute(None)
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        self.expected_pod['spec']['hostNetwork'] = True
+        self.expected_pod['spec']['dnsPolicy'] = dns_policy
+        self.assertEqual(self.expected_pod, actual_pod)
 
     def test_pod_node_selectors(self):
         node_selectors = {
@@ -266,6 +264,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
             node_selectors=node_selectors,
         )
         k.execute(None)
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        self.expected_pod['spec']['nodeSelector'] = node_selectors
+        self.assertEqual(self.expected_pod, actual_pod)
 
     def test_pod_resources(self):
         resources = {
@@ -287,6 +288,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
             resources=resources,
         )
         k.execute(None)
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        self.expected_pod['spec']['containers'][0]['resources'] = resources
+        self.assertEqual(self.expected_pod, actual_pod)
 
     def test_pod_affinity(self):
         affinity = {
@@ -319,6 +323,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
             affinity=affinity,
         )
         k.execute(None)
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        self.expected_pod['spec']['affinity'] = affinity
+        self.assertEqual(self.expected_pod, actual_pod)
 
     def test_port(self):
         port = Port('http', 80)
@@ -336,6 +343,12 @@ class TestKubernetesPodOperator(unittest.TestCase):
             ports=[port],
         )
         k.execute(None)
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        self.expected_pod['spec']['containers'][0]['ports'] = [{
+            'name': 'http',
+            'containerPort': 80
+        }]
+        self.assertEqual(self.expected_pod, actual_pod)
 
     def test_volume_mount(self):
         with mock.patch.object(PodLauncher, 'log') as mock_logger:
@@ -368,6 +381,20 @@ class TestKubernetesPodOperator(unittest.TestCase):
             )
             k.execute(None)
             mock_logger.info.assert_any_call(b"retrieved from mount\n")
+            actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+            self.expected_pod['spec']['containers'][0]['args'] = args
+            self.expected_pod['spec']['containers'][0]['volumeMounts'] = [{
+                'name': 'test-volume',
+                'mountPath': '/tmp/test_volume',
+                'readOnly': False
+            }]
+            self.expected_pod['spec']['volumes'] = [{
+                'name': 'test-volume',
+                'persistentVolumeClaim': {
+                    'claimName': 'test-volume'
+                }
+            }]
+            self.assertEqual(self.expected_pod, actual_pod)
 
     def test_run_as_user_root(self):
         security_context = {
@@ -388,6 +415,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
             security_context=security_context,
         )
         k.execute(None)
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        self.expected_pod['spec']['securityContext'] = security_context
+        self.assertEqual(self.expected_pod, actual_pod)
 
     def test_run_as_user_non_root(self):
         security_context = {
@@ -409,6 +439,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
             security_context=security_context,
         )
         k.execute(None)
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        self.expected_pod['spec']['securityContext'] = security_context
+        self.assertEqual(self.expected_pod, actual_pod)
 
     def test_fs_group(self):
         security_context = {
@@ -430,6 +463,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
             security_context=security_context,
         )
         k.execute(None)
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        self.expected_pod['spec']['securityContext'] = security_context
+        self.assertEqual(self.expected_pod, actual_pod)
 
     def test_faulty_image(self):
         bad_image_name = "foobar"
@@ -447,6 +483,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
         )
         with self.assertRaises(AirflowException):
             k.execute(None)
+            actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+            self.expected_pod['spec']['containers'][0]['image'] = bad_image_name
+            self.assertEqual(self.expected_pod, actual_pod)
 
     def test_faulty_service_account(self):
         bad_service_account_name = "foobar"
@@ -465,6 +504,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
         )
         with self.assertRaises(ApiException):
             k.execute(None)
+            actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+            self.expected_pod['spec']['serviceAccountName'] = bad_service_account_name
+            self.assertEqual(self.expected_pod, actual_pod)
 
     def test_pod_failure(self):
         """
@@ -484,6 +526,9 @@ class TestKubernetesPodOperator(unittest.TestCase):
         )
         with self.assertRaises(AirflowException):
             k.execute(None)
+            actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+            self.expected_pod['spec']['containers'][0]['args'] = bad_internal_command
+            self.assertEqual(self.expected_pod, actual_pod)
 
     def test_xcom_push(self):
         return_value = '{"foo": "bar"\n, "buzz": 2}'
@@ -500,13 +545,23 @@ class TestKubernetesPodOperator(unittest.TestCase):
             do_xcom_push=True,
         )
         self.assertEqual(k.execute(None), json.loads(return_value))
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        volume = self.api_client.sanitize_for_serialization(PodDefaults.VOLUME)
+        volume_mount = self.api_client.sanitize_for_serialization(PodDefaults.VOLUME_MOUNT)
+        container = self.api_client.sanitize_for_serialization(PodDefaults.SIDECAR_CONTAINER)
+        self.expected_pod['spec']['containers'][0]['args'] = args
+        self.expected_pod['spec']['containers'][0]['volumeMounts'].insert(0, volume_mount)
+        self.expected_pod['spec']['volumes'].insert(0, volume)
+        self.expected_pod['spec']['containers'].append(container)
+        self.assertEqual(self.expected_pod, actual_pod)
 
     @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
     @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
     def test_envs_from_configmaps(self, mock_client, mock_launcher):
         # GIVEN
         from airflow.utils.state import State
-        configmaps = ['test-configmap']
+
+        configmap = 'test-configmap'
         # WHEN
         k = KubernetesPodOperator(
             namespace='default',
@@ -518,12 +573,17 @@ class TestKubernetesPodOperator(unittest.TestCase):
             task_id="task",
             in_cluster=False,
             do_xcom_push=False,
-            configmaps=configmaps
+            configmaps=[configmap]
         )
         # THEN
         mock_launcher.return_value = (State.SUCCESS, None)
         k.execute(None)
-        self.assertEqual(mock_launcher.call_args[0][0].configmaps, configmaps)
+        self.assertEqual(
+            mock_launcher.call_args[0][0].spec.containers[0].env_from,
+            [k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(
+                name=configmap
+            ))]
+        )
 
     @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.run_pod")
     @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
@@ -548,7 +608,12 @@ class TestKubernetesPodOperator(unittest.TestCase):
         # THEN
         mock_launcher.return_value = (State.SUCCESS, None)
         k.execute(None)
-        self.assertEqual(mock_launcher.call_args[0][0].secrets, secrets)
+        self.assertEqual(
+            mock_launcher.call_args[0][0].spec.containers[0].env_from,
+            [k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(
+                name=secret_ref
+            ))]
+        )
 
 
 # pylint: enable=unused-argument
diff --git a/tests/test_local_settings.py b/tests/test_local_settings.py
index fdc6cb9..3497ee2 100644
--- a/tests/test_local_settings.py
+++ b/tests/test_local_settings.py
@@ -23,8 +23,6 @@ import tempfile
 import unittest
 from tests.compat import MagicMock, Mock, call, patch
 
-from airflow.kubernetes.pod import Pod
-
 
 SETTINGS_FILE_POLICY = """
 def test_policy(task_instance):
@@ -149,7 +147,7 @@ class LocalSettingsTest(unittest.TestCase):
             from airflow import settings
             settings.import_local_settings()  # pylint: ignore
 
-            pod = Pod(image="ubuntu", envs={}, cmds=['echo "1"'])
+            pod = MagicMock()
             settings.pod_mutation_hook(pod)
 
             assert pod.namespace == 'airflow-tests'