You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by di...@apache.org on 2020/06/25 19:33:18 UTC

[airflow] branch v1-10-test updated (48922f2 -> 6010ee2)

This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a change to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    omit 48922f2  [AIRFLOW-5413] Refactor worker config (#7114)
     new 7293991  [AIRFLOW-5413] Refactor worker config (#7114)
     new 6010ee2  [AIRFLOW-5641] Support running git sync container as root (#6312)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (48922f2)
            \
             N -- N -- N   refs/heads/v1-10-test (6010ee2)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 airflow/executors/kubernetes_executor.py      |  7 ++++---
 airflow/kubernetes/pod_generator.py           |  3 ++-
 airflow/kubernetes/worker_configuration.py    |  2 +-
 tests/executors/test_kubernetes_executor.py   |  8 ++++----
 tests/kubernetes/test_worker_configuration.py | 15 +++++++++++++++
 5 files changed, 26 insertions(+), 9 deletions(-)


[airflow] 02/02: [AIRFLOW-5641] Support running git sync container as root (#6312)

Posted by di...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 6010ee2f8aeea70cb24a3b4a7870ffc814261a09
Author: Qingping Hou <qp...@scribd.com>
AuthorDate: Tue Oct 15 03:58:31 2019 -0700

    [AIRFLOW-5641] Support running git sync container as root (#6312)
    
    
    (cherry picked from commit 133085eb47e04683ce3dca52b967aa41f8139613)
---
 airflow/executors/kubernetes_executor.py      |  5 +++--
 airflow/kubernetes/worker_configuration.py    |  2 +-
 tests/kubernetes/test_worker_configuration.py | 15 +++++++++++++++
 3 files changed, 19 insertions(+), 3 deletions(-)

diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index 74e504e..e3be2ef 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -20,6 +20,7 @@ import json
 import multiprocessing
 import time
 from queue import Empty
+from typing import Union
 from uuid import uuid4
 
 import kubernetes
@@ -210,10 +211,10 @@ class KubeConfig:
 
     # pod security context items should return integers
     # and only return a blank string if contexts are not set.
-    def _get_security_context_val(self, scontext):
+    def _get_security_context_val(self, scontext: str) -> Union[str, int]:
         val = conf.get(self.kubernetes_section, scontext)
         if not val:
-            return 0
+            return ""
         else:
             return int(val)
 
diff --git a/airflow/kubernetes/worker_configuration.py b/airflow/kubernetes/worker_configuration.py
index 3464e81..820763b 100644
--- a/airflow/kubernetes/worker_configuration.py
+++ b/airflow/kubernetes/worker_configuration.py
@@ -163,7 +163,7 @@ class WorkerConfiguration(LoggingMixin):
 
         if self.kube_config.git_sync_run_as_user != "":
             init_containers.security_context = k8s.V1SecurityContext(
-                run_as_user=self.kube_config.git_sync_run_as_user or 65533
+                run_as_user=self.kube_config.git_sync_run_as_user
             )  # git-sync user
 
         return [init_containers]
diff --git a/tests/kubernetes/test_worker_configuration.py b/tests/kubernetes/test_worker_configuration.py
index 74009a1..73b3f20 100644
--- a/tests/kubernetes/test_worker_configuration.py
+++ b/tests/kubernetes/test_worker_configuration.py
@@ -305,6 +305,21 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
 
         self.assertIsNone(init_containers[0].security_context)
 
+    def test_init_environment_using_git_sync_run_as_user_root(self):
+        # Tests if git_syn_run_as_user is '0', securityContext is created with
+        # the right uid
+
+        self.kube_config.dags_volume_claim = None
+        self.kube_config.dags_volume_host = None
+        self.kube_config.dags_in_image = None
+        self.kube_config.git_sync_run_as_user = 0
+
+        worker_config = WorkerConfiguration(self.kube_config)
+        init_containers = worker_config._get_init_containers()
+        self.assertTrue(init_containers)  # check not empty
+
+        self.assertEqual(0, init_containers[0].security_context.run_as_user)
+
     def test_make_pod_run_as_user_0(self):
         # Tests the pod created with run-as-user 0 actually gets that in it's config
         self.kube_config.worker_run_as_user = 0


[airflow] 01/02: [AIRFLOW-5413] Refactor worker config (#7114)

Posted by di...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 729399141d8dd46a25a594a693451ee55fd9ce75
Author: davlum <da...@gmail.com>
AuthorDate: Thu Jan 9 15:39:05 2020 -0500

    [AIRFLOW-5413] Refactor worker config (#7114)
    
    (cherry picked from commit 51f262c65afd7eaecc54661a3b5c4e533feecff8)
---
 .github/workflows/ci.yml                           |   2 +-
 .../contrib/operators/kubernetes_pod_operator.py   |   4 +-
 airflow/executors/kubernetes_executor.py           |  13 +-
 airflow/kubernetes/pod_generator.py                | 259 ++++++++--
 airflow/kubernetes/worker_configuration.py         |  18 +-
 tests/executors/test_kubernetes_executor.py        | 100 ++--
 tests/kubernetes/test_pod_generator.py             | 541 ++++++++++++++++++---
 tests/kubernetes/test_worker_configuration.py      |  95 +++-
 8 files changed, 838 insertions(+), 194 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index fb16aaf..67b9e50 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -144,7 +144,7 @@ jobs:
       - name: Cache virtualenv for kubernetes testing
         uses: actions/cache@v2
         env:
-          cache-name: cache-kubernetes-tests-virtualenv-v2
+          cache-name: cache-kubernetes-tests-virtualenv-v3
         with:
           path: .build/.kubernetes_venv
           key: "${{ env.cache-name }}-${{ github.job }}-\
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index 8adb131..d439eda 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -367,11 +367,11 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
                 pod,
                 startup_timeout=self.startup_timeout_seconds)
             final_state, result = launcher.monitor_pod(pod=pod, get_logs=self.get_logs)
-        except AirflowException:
+        except AirflowException as ex:
             if self.log_events_on_failure:
                 for event in launcher.read_pod_events(pod).items:
                     self.log.error("Pod Event: %s - %s", event.reason, event.message)
-            raise
+            raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
         finally:
             if self.is_delete_operator_pod:
                 launcher.delete_pod(pod)
diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index d458d7a..74e504e 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -71,7 +71,7 @@ class KubeConfig:
             self.kubernetes_section, "worker_container_image_pull_policy"
         )
         self.kube_node_selectors = configuration_dict.get('kubernetes_node_selectors', {})
-        self.kube_annotations = configuration_dict.get('kubernetes_annotations', {})
+        self.kube_annotations = configuration_dict.get('kubernetes_annotations', {}) or None
         self.kube_labels = configuration_dict.get('kubernetes_labels', {})
         self.delete_worker_pods = conf.getboolean(
             self.kubernetes_section, 'delete_worker_pods')
@@ -357,7 +357,7 @@ class AirflowKubernetesScheduler(LoggingMixin):
         self.log.debug("Kubernetes using namespace %s", self.namespace)
         self.kube_client = kube_client
         self.launcher = PodLauncher(kube_client=self.kube_client)
-        self.worker_configuration = WorkerConfiguration(kube_config=self.kube_config)
+        self.worker_configuration_pod = WorkerConfiguration(kube_config=self.kube_config).as_pod()
         self._manager = multiprocessing.Manager()
         self.watcher_queue = self._manager.Queue()
         self.worker_uuid = worker_uuid
@@ -393,19 +393,20 @@ class AirflowKubernetesScheduler(LoggingMixin):
         if command[0:2] != ["airflow", "run"]:
             raise ValueError('The command must start with ["airflow", "run"].')
 
-        config_pod = self.worker_configuration.make_pod(
+        pod = PodGenerator.construct_pod(
             namespace=self.namespace,
             worker_uuid=self.worker_uuid,
             pod_id=self._create_pod_id(dag_id, task_id),
             dag_id=pod_generator.make_safe_label_value(dag_id),
             task_id=pod_generator.make_safe_label_value(task_id),
             try_number=try_number,
-            execution_date=self._datetime_to_label_safe_datestring(execution_date),
-            airflow_command=command
+            date=self._datetime_to_label_safe_datestring(execution_date),
+            command=command,
+            kube_executor_config=kube_executor_config,
+            worker_config=self.worker_configuration_pod
         )
         # Reconcile the pod generated by the Operator and the Pod
         # generated by the .cfg file
-        pod = PodGenerator.reconcile_pods(config_pod, kube_executor_config)
         self.log.debug("Kubernetes running for command %s", command)
         self.log.debug("Kubernetes launching image %s", pod.spec.containers[0].image)
 
diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index a614f41..bf0cedf 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -28,7 +28,7 @@ import uuid
 
 import kubernetes.client.models as k8s
 
-from airflow.executors import Executors
+from airflow.version import version as airflow_version
 
 MAX_LABEL_LEN = 63
 
@@ -87,28 +87,59 @@ class PodGenerator:
     Contains Kubernetes Airflow Worker configuration logic
 
     Represents a kubernetes pod and manages execution of a single pod.
+    Any configuration that is container specific gets applied to
+    the first container in the list of containers.
+
+    Parameters with a type of `kubernetes.client.models.*`/`k8s.*` can
+    often be replaced with their dictionary equivalent, for example the output of
+    `sanitize_for_serialization`.
+
     :param image: The docker image
-    :type image: str
+    :type image: Optional[str]
+    :param name: name in the metadata section (not the container name)
+    :type name: Optional[str]
+    :param namespace: pod namespace
+    :type namespace: Optional[str]
+    :param volume_mounts: list of kubernetes volumes mounts
+    :type volume_mounts: Optional[List[Union[k8s.V1VolumeMount, dict]]]
     :param envs: A dict containing the environment variables
-    :type envs: Dict[str, str]
-    :param cmds: The command to be run on the pod
-    :type cmds: List[str]
-    :param secrets: Secrets to be launched to the pod
-    :type secrets: List[airflow.kubernetes.models.secret.Secret]
+    :type envs: Optional[Dict[str, str]]
+    :param cmds: The command to be run on the first container
+    :type cmds: Optional[List[str]]
+    :param args: The arguments to be run on the pod
+    :type args: Optional[List[str]]
+    :param labels: labels for the pod metadata
+    :type labels: Optional[Dict[str, str]]
+    :param node_selectors: node selectors for the pod
+    :type node_selectors: Optional[Dict[str, str]]
+    :param ports: list of ports. Applies to the first container.
+    :type ports: Optional[List[Union[k8s.V1ContainerPort, dict]]]
+    :param volumes: Volumes to be attached to the first container
+    :type volumes: Optional[List[Union[k8s.V1Volume, dict]]]
     :param image_pull_policy: Specify a policy to cache or always pull an image
     :type image_pull_policy: str
+    :param restart_policy: The restart policy of the pod
+    :type restart_policy: str
     :param image_pull_secrets: Any image pull secrets to be given to the pod.
         If more than one secret is required, provide a comma separated list:
         secret_a,secret_b
     :type image_pull_secrets: str
+    :param init_containers: A list of init containers
+    :type init_containers: Optional[List[k8s.V1Container]]
+    :param service_account_name: Identity for processes that run in a Pod
+    :type service_account_name: Optional[str]
+    :param resources: Resource requirements for the first containers
+    :type resources: Optional[Union[k8s.V1ResourceRequirements, dict]]
+    :param annotations: annotations for the pod
+    :type annotations: Optional[Dict[str, str]]
     :param affinity: A dict containing a group of affinity scheduling rules
-    :type affinity: dict
+    :type affinity: Optional[dict]
     :param hostnetwork: If True enable host networking on the pod
     :type hostnetwork: bool
     :param tolerations: A list of kubernetes tolerations
-    :type tolerations: list
+    :type tolerations: Optional[list]
     :param security_context: A dict containing the security context for the pod
-    :type security_context: dict
+    :type security_context: Optional[Union[k8s.V1PodSecurityContext, dict]]
     :param configmaps: Any configmap refs to envfrom.
         If more than one configmap is required, provide a comma separated list
         configmap_a,configmap_b
@@ -117,11 +148,13 @@ class PodGenerator:
     :type dnspolicy: str
     :param pod: The fully specified pod.
     :type pod: kubernetes.client.models.V1Pod
+    :param extract_xcom: Whether to bring up a container for xcom
+    :type extract_xcom: bool
     """
 
     def __init__(
         self,
-        image,
+        image=None,
         name=None,
         namespace=None,
         volume_mounts=None,
@@ -225,10 +258,11 @@ class PodGenerator:
             result.metadata = self.metadata
             result.spec.containers = [self.container]
 
+        result.metadata.name = self.make_unique_pod_id(result.metadata.name)
+
         if self.extract_xcom:
             result = self.add_sidecar(result)
 
-        result.metadata.name = self.make_unique_pod_id(result.metadata.name)
         return result
 
     @staticmethod
@@ -252,8 +286,9 @@ class PodGenerator:
     @staticmethod
     def add_sidecar(pod):
         pod_cp = copy.deepcopy(pod)
-
+        pod_cp.spec.volumes = pod.spec.volumes or []
         pod_cp.spec.volumes.insert(0, PodDefaults.VOLUME)
+        pod_cp.spec.containers[0].volume_mounts = pod_cp.spec.containers[0].volume_mounts or []
         pod_cp.spec.containers[0].volume_mounts.insert(0, PodDefaults.VOLUME_MOUNT)
         pod_cp.spec.containers.append(PodDefaults.SIDECAR_CONTAINER)
 
@@ -262,7 +297,7 @@ class PodGenerator:
     @staticmethod
     def from_obj(obj):
         if obj is None:
-            return k8s.V1Pod()
+            return None
 
         if isinstance(obj, PodGenerator):
             return obj.gen_pod()
@@ -272,7 +307,12 @@ class PodGenerator:
                 'Cannot convert a non-dictionary or non-PodGenerator '
                 'object into a KubernetesExecutorConfig')
 
-        namespaced = obj.get(Executors.KubernetesExecutor, {})
+        # We do not want to extract constant here from ExecutorLoader because it is just
+        # A name in dictionary rather than executor selection mechanism and it causes cyclic import
+        namespaced = obj.get("KubernetesExecutor", {})
+
+        if not namespaced:
+            return None
 
         resources = namespaced.get('resources')
 
@@ -348,46 +388,159 @@ class PodGenerator:
         should be preserved from base, the volumes appended to and
         the other fields overwritten.
         """
+        if client_pod is None:
+            return base_pod
 
         client_pod_cp = copy.deepcopy(client_pod)
+        client_pod_cp.spec = PodGenerator.reconcile_specs(base_pod.spec, client_pod_cp.spec)
 
-        def merge_objects(base_obj, client_obj):
-            for base_key in base_obj.to_dict().keys():
-                base_val = getattr(base_obj, base_key, None)
-                if not getattr(client_obj, base_key, None) and base_val:
-                    setattr(client_obj, base_key, base_val)
-
-        def extend_object_field(base_obj, client_obj, field_name):
-            base_obj_field = getattr(base_obj, field_name, None)
-            client_obj_field = getattr(client_obj, field_name, None)
-            if not base_obj_field:
-                return
-            if not client_obj_field:
-                setattr(client_obj, field_name, base_obj_field)
-                return
-            appended_fields = base_obj_field + client_obj_field
-            setattr(client_obj, field_name, appended_fields)
-
-        # Values at the pod and metadata should be overwritten where they exist,
-        # but certain values at the spec and container level must be conserved.
-        base_container = base_pod.spec.containers[0]
-        client_container = client_pod_cp.spec.containers[0]
-
-        extend_object_field(base_container, client_container, 'volume_mounts')
-        extend_object_field(base_container, client_container, 'env')
-        extend_object_field(base_container, client_container, 'env_from')
-        extend_object_field(base_container, client_container, 'ports')
-        extend_object_field(base_container, client_container, 'volume_devices')
-        client_container.command = base_container.command
-        client_container.args = base_container.args
-        merge_objects(base_pod.spec.containers[0], client_pod_cp.spec.containers[0])
-        # Just append any additional containers from the base pod
-        client_pod_cp.spec.containers.extend(base_pod.spec.containers[1:])
-
-        merge_objects(base_pod.metadata, client_pod_cp.metadata)
-
-        extend_object_field(base_pod.spec, client_pod_cp.spec, 'volumes')
-        merge_objects(base_pod.spec, client_pod_cp.spec)
-        merge_objects(base_pod, client_pod_cp)
+        client_pod_cp.metadata = merge_objects(base_pod.metadata, client_pod_cp.metadata)
+        client_pod_cp = merge_objects(base_pod, client_pod_cp)
 
         return client_pod_cp
+
+    @staticmethod
+    def reconcile_specs(base_spec,
+                        client_spec):
+        """
+        :param base_spec: has the base attributes which are overwritten if they exist
+            in the client_spec and remain if they do not exist in the client_spec
+        :type base_spec: k8s.V1PodSpec
+        :param client_spec: the spec that the client wants to create.
+        :type client_spec: k8s.V1PodSpec
+        :return: the merged specs
+        """
+        if base_spec and not client_spec:
+            return base_spec
+        if not base_spec and client_spec:
+            return client_spec
+        elif client_spec and base_spec:
+            client_spec.containers = PodGenerator.reconcile_containers(
+                base_spec.containers, client_spec.containers
+            )
+            merged_spec = extend_object_field(base_spec, client_spec, 'volumes')
+            return merge_objects(base_spec, merged_spec)
+
+        return None
+
+    @staticmethod
+    def reconcile_containers(base_containers,
+                             client_containers):
+        """
+        :param base_containers: has the base attributes which are overwritten if they exist
+            in the client_containers and remain if they do not exist in the client_containers
+        :type base_containers: List[k8s.V1Container]
+        :param client_containers: the containers that the client wants to create.
+        :type client_containers: List[k8s.V1Container]
+        :return: the merged containers
+
+        The runs recursively over the list of containers.
+        """
+        if not base_containers:
+            return client_containers
+        if not client_containers:
+            return base_containers
+
+        client_container = client_containers[0]
+        base_container = base_containers[0]
+        client_container = extend_object_field(base_container, client_container, 'volume_mounts')
+        client_container = extend_object_field(base_container, client_container, 'env')
+        client_container = extend_object_field(base_container, client_container, 'env_from')
+        client_container = extend_object_field(base_container, client_container, 'ports')
+        client_container = extend_object_field(base_container, client_container, 'volume_devices')
+        client_container = merge_objects(base_container, client_container)
+
+        return [client_container] + PodGenerator.reconcile_containers(
+            base_containers[1:], client_containers[1:]
+        )
+
+    @staticmethod
+    def construct_pod(
+        dag_id,
+        task_id,
+        pod_id,
+        try_number,
+        date,
+        command,
+        kube_executor_config,
+        worker_config,
+        namespace,
+        worker_uuid
+    ):
+        """
+        Construct a pod by gathering and consolidating the configuration from 3 places:
+            - airflow.cfg
+            - executor_config
+            - dynamic arguments
+        """
+        dynamic_pod = PodGenerator(
+            namespace=namespace,
+            image='',
+            labels={
+                'airflow-worker': worker_uuid,
+                'dag_id': dag_id,
+                'task_id': task_id,
+                'execution_date': date,
+                'try_number': str(try_number),
+                'airflow_version': airflow_version.replace('+', '-'),
+                'kubernetes_executor': 'True',
+            },
+            cmds=command,
+            name=pod_id
+        ).gen_pod()
+
+        # Reconcile the pod generated by the Operator and the Pod
+        # generated by the .cfg file
+        pod_with_executor_config = PodGenerator.reconcile_pods(worker_config,
+                                                               kube_executor_config)
+        # Reconcile that pod with the dynamic fields.
+        return PodGenerator.reconcile_pods(pod_with_executor_config, dynamic_pod)
+
+
+def merge_objects(base_obj, client_obj):
+    """
+    :param base_obj: has the base attributes which are overwritten if they exist
+        in the client_obj and remain if they do not exist in the client_obj
+    :param client_obj: the object that the client wants to create.
+    :return: the merged objects
+    """
+    if not base_obj:
+        return client_obj
+    if not client_obj:
+        return base_obj
+
+    client_obj_cp = copy.deepcopy(client_obj)
+
+    for base_key in base_obj.to_dict().keys():
+        base_val = getattr(base_obj, base_key, None)
+        if not getattr(client_obj, base_key, None) and base_val:
+            setattr(client_obj_cp, base_key, base_val)
+    return client_obj_cp
+
+
+def extend_object_field(base_obj, client_obj, field_name):
+    """
+    :param base_obj: an object which has a property `field_name` that is a list
+    :param client_obj: an object which has a property `field_name` that is a list.
+        A copy of this object is returned with `field_name` modified
+    :param field_name: the name of the list field
+    :type field_name: str
+    :return: the client_obj with the property `field_name` being the two properties appended
+    """
+    client_obj_cp = copy.deepcopy(client_obj)
+    base_obj_field = getattr(base_obj, field_name, None)
+    client_obj_field = getattr(client_obj, field_name, None)
+
+    if (not isinstance(base_obj_field, list) and base_obj_field is not None) or \
+       (not isinstance(client_obj_field, list) and client_obj_field is not None):
+        raise ValueError("The chosen field must be a list.")
+
+    if not base_obj_field:
+        return client_obj_cp
+    if not client_obj_field:
+        setattr(client_obj_cp, field_name, base_obj_field)
+        return client_obj_cp
+
+    appended_fields = base_obj_field + client_obj_field
+    setattr(client_obj_cp, field_name, appended_fields)
+    return client_obj_cp
diff --git a/airflow/kubernetes/worker_configuration.py b/airflow/kubernetes/worker_configuration.py
index bed1ac2..3464e81 100644
--- a/airflow/kubernetes/worker_configuration.py
+++ b/airflow/kubernetes/worker_configuration.py
@@ -25,7 +25,6 @@ from airflow.kubernetes.k8s_model import append_to_pod
 from airflow.kubernetes.pod_generator import PodGenerator
 from airflow.kubernetes.secret import Secret
 from airflow.utils.log.logging_mixin import LoggingMixin
-from airflow.version import version as airflow_version
 
 
 class WorkerConfiguration(LoggingMixin):
@@ -418,23 +417,12 @@ class WorkerConfiguration(LoggingMixin):
 
         return self.kube_config.git_dags_folder_mount_point
 
-    def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_date,
-                 try_number, airflow_command):
+    def as_pod(self):
+        """Creates POD."""
         pod_generator = PodGenerator(
-            namespace=namespace,
-            name=pod_id,
             image=self.kube_config.kube_image,
             image_pull_policy=self.kube_config.kube_image_pull_policy,
-            labels={
-                'airflow-worker': worker_uuid,
-                'dag_id': dag_id,
-                'task_id': task_id,
-                'execution_date': execution_date,
-                'try_number': str(try_number),
-                'airflow_version': airflow_version.replace('+', '-'),
-                'kubernetes_executor': 'True',
-            },
-            cmds=airflow_command,
+            image_pull_secrets=self.kube_config.image_pull_secrets,
             volumes=self._get_volumes(),
             volume_mounts=self._get_volume_mounts(),
             init_containers=self._get_init_containers(),
diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index 993c47a..2b3ed17 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -26,12 +26,12 @@ from urllib3 import HTTPResponse
 
 from airflow.utils import timezone
 from tests.compat import mock
-
+from tests.test_utils.config import conf_vars
 try:
     from kubernetes.client.rest import ApiException
     from airflow import configuration  # noqa: F401
     from airflow.configuration import conf  # noqa: F401
-    from airflow.executors.kubernetes_executor import AirflowKubernetesScheduler
+    from airflow.executors.kubernetes_executor import AirflowKubernetesScheduler, KubeConfig
     from airflow.executors.kubernetes_executor import KubernetesExecutor
     from airflow.kubernetes import pod_generator
     from airflow.kubernetes.pod_generator import PodGenerator
@@ -124,6 +124,56 @@ class TestAirflowKubernetesScheduler(unittest.TestCase):
         self.assertEqual(datetime_obj, new_datetime_obj)
 
 
+class TestKubeConfig(unittest.TestCase):
+    def setUp(self):
+        if AirflowKubernetesScheduler is None:
+            self.skipTest("kubernetes python package is not installed")
+
+    @conf_vars({
+        ('kubernetes', 'git_ssh_known_hosts_configmap_name'): 'airflow-configmap',
+        ('kubernetes', 'git_ssh_key_secret_name'): 'airflow-secrets',
+        ('kubernetes_annotations', "iam.com/role"): "role-arn",
+        ('kubernetes_annotations', "other/annotation"):  "value"
+    })
+    def test_kube_config_worker_annotations_properly_parsed(self):
+        annotations = KubeConfig().kube_annotations
+        self.assertEqual({'iam.com/role': 'role-arn', 'other/annotation': 'value'}, annotations)
+
+    @conf_vars({
+        ('kubernetes', 'git_ssh_known_hosts_configmap_name'): 'airflow-configmap',
+        ('kubernetes', 'git_ssh_key_secret_name'): 'airflow-secrets'
+    })
+    def test_kube_config_no_worker_annotations(self):
+        annotations = KubeConfig().kube_annotations
+        self.assertIsNone(annotations)
+
+    @conf_vars({
+        ('kubernetes', 'git_repo'): 'foo',
+        ('kubernetes', 'git_branch'): 'foo',
+        ('kubernetes', 'git_dags_folder_mount_point'): 'foo',
+        ('kubernetes', 'git_sync_run_as_user'): '0',
+    })
+    def test_kube_config_git_sync_run_as_user_root(self):
+        self.assertEqual(KubeConfig().git_sync_run_as_user, 0)
+
+    @conf_vars({
+        ('kubernetes', 'git_repo'): 'foo',
+        ('kubernetes', 'git_branch'): 'foo',
+        ('kubernetes', 'git_dags_folder_mount_point'): 'foo',
+    })
+    def test_kube_config_git_sync_run_as_user_not_present(self):
+        self.assertEqual(KubeConfig().git_sync_run_as_user, 65533)
+
+    @conf_vars({
+        ('kubernetes', 'git_repo'): 'foo',
+        ('kubernetes', 'git_branch'): 'foo',
+        ('kubernetes', 'git_dags_folder_mount_point'): 'foo',
+        ('kubernetes', 'git_sync_run_as_user'): '',
+    })
+    def test_kube_config_git_sync_run_as_user_empty_string(self):
+        self.assertEqual(KubeConfig().git_sync_run_as_user, '')
+
+
 class TestKubernetesExecutor(unittest.TestCase):
     """
     Tests if an ApiException from the Kube Client will cause the task to
@@ -136,44 +186,45 @@ class TestKubernetesExecutor(unittest.TestCase):
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
     def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watcher):
         # When a quota is exceeded this is the ApiException we get
-        r = HTTPResponse(
+        response = HTTPResponse(
             body='{"kind": "Status", "apiVersion": "v1", "metadata": {}, "status": "Failure", '
                  '"message": "pods \\"podname\\" is forbidden: exceeded quota: compute-resources, '
                  'requested: limits.memory=4Gi, used: limits.memory=6508Mi, limited: limits.memory=10Gi", '
                  '"reason": "Forbidden", "details": {"name": "podname", "kind": "pods"}, "code": 403}')
-        r.status = 403
-        r.reason = "Forbidden"
+        response.status = 403
+        response.reason = "Forbidden"
 
         # A mock kube_client that throws errors when making a pod
         mock_kube_client = mock.patch('kubernetes.client.CoreV1Api', autospec=True)
         mock_kube_client.create_namespaced_pod = mock.MagicMock(
-            side_effect=ApiException(http_resp=r))
+            side_effect=ApiException(http_resp=response))
         mock_get_kube_client.return_value = mock_kube_client
         mock_api_client = mock.MagicMock()
         mock_api_client.sanitize_for_serialization.return_value = {}
         mock_kube_client.api_client = mock_api_client
 
-        kubernetesExecutor = KubernetesExecutor()
-        kubernetesExecutor.start()
+        kubernetes_executor = KubernetesExecutor()
+        kubernetes_executor.start()
 
         # Execute a task while the Api Throws errors
         try_number = 1
-        kubernetesExecutor.execute_async(key=('dag', 'task', datetime.utcnow(), try_number),
-                                         command=['airflow', 'run', 'true', 'some_parameter'],
-                                         executor_config={})
-        kubernetesExecutor.sync()
-        kubernetesExecutor.sync()
+        kubernetes_executor.execute_async(key=('dag', 'task', datetime.utcnow(), try_number),
+                                          queue=None,
+                                          command=['airflow', 'run', 'command'],
+                                          executor_config={})
+        kubernetes_executor.sync()
+        kubernetes_executor.sync()
 
         assert mock_kube_client.create_namespaced_pod.called
-        self.assertFalse(kubernetesExecutor.task_queue.empty())
+        self.assertFalse(kubernetes_executor.task_queue.empty())
 
         # Disable the ApiException
         mock_kube_client.create_namespaced_pod.side_effect = None
 
         # Execute the task without errors should empty the queue
-        kubernetesExecutor.sync()
+        kubernetes_executor.sync()
         assert mock_kube_client.create_namespaced_pod.called
-        self.assertTrue(kubernetesExecutor.task_queue.empty())
+        self.assertTrue(kubernetes_executor.task_queue.empty())
 
     @mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor.sync')
@@ -187,22 +238,19 @@ class TestKubernetesExecutor(unittest.TestCase):
                  mock.call('executor.running_tasks', mock.ANY)]
         mock_stats_gauge.assert_has_calls(calls)
 
-    @mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
-    def test_change_state_running(self, mock_get_kube_client, mock_kubernetes_job_watcher, mock_kube_config):
+    def test_change_state_running(self, mock_get_kube_client, mock_kubernetes_job_watcher):
         executor = KubernetesExecutor()
         executor.start()
         key = ('dag_id', 'task_id', 'ex_time', 'try_number1')
         executor._change_state(key, State.RUNNING, 'pod_id', 'default')
         self.assertTrue(executor.event_buffer[key] == State.RUNNING)
 
-    @mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
     @mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
-    def test_change_state_success(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher,
-                                  mock_kube_config):
+    def test_change_state_success(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher):
         executor = KubernetesExecutor()
         executor.start()
         test_time = timezone.utcnow()
@@ -211,12 +259,10 @@ class TestKubernetesExecutor(unittest.TestCase):
         self.assertTrue(executor.event_buffer[key] == State.SUCCESS)
         mock_delete_pod.assert_called_once_with('pod_id', 'default')
 
-    @mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
     @mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
-    def test_change_state_failed(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher,
-                                 mock_kube_config):
+    def test_change_state_failed(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher):
         executor = KubernetesExecutor()
         executor.kube_config.delete_worker_pods = False
         executor.kube_config.delete_worker_pods_on_failure = False
@@ -227,12 +273,11 @@ class TestKubernetesExecutor(unittest.TestCase):
         self.assertTrue(executor.event_buffer[key] == State.FAILED)
         mock_delete_pod.assert_not_called()
 
-    @mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
     @mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
     def test_change_state_skip_pod_deletion(self, mock_delete_pod, mock_get_kube_client,
-                                            mock_kubernetes_job_watcher, mock_kube_config):
+                                            mock_kubernetes_job_watcher):
         test_time = timezone.utcnow()
         executor = KubernetesExecutor()
         executor.kube_config.delete_worker_pods = False
@@ -243,12 +288,11 @@ class TestKubernetesExecutor(unittest.TestCase):
         self.assertTrue(executor.event_buffer[key] == State.SUCCESS)
         mock_delete_pod.assert_not_called()
 
-    @mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
     @mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
     def test_change_state_failed_pod_deletion(self, mock_delete_pod, mock_get_kube_client,
-                                              mock_kubernetes_job_watcher, mock_kube_config):
+                                              mock_kubernetes_job_watcher):
         executor = KubernetesExecutor()
         executor.kube_config.delete_worker_pods_on_failure = True
 
diff --git a/tests/kubernetes/test_pod_generator.py b/tests/kubernetes/test_pod_generator.py
index 30839e7..a9a3aa5 100644
--- a/tests/kubernetes/test_pod_generator.py
+++ b/tests/kubernetes/test_pod_generator.py
@@ -20,15 +20,17 @@ from tests.compat import mock
 import uuid
 import kubernetes.client.models as k8s
 from kubernetes.client import ApiClient
-from airflow.kubernetes.secret import Secret
-from airflow.kubernetes.pod_generator import PodGenerator, PodDefaults
-from airflow.kubernetes.pod import Resources
+
 from airflow.kubernetes.k8s_model import append_to_pod
+from airflow.kubernetes.pod import Resources
+from airflow.kubernetes.pod_generator import PodDefaults, PodGenerator, extend_object_field, merge_objects
+from airflow.kubernetes.secret import Secret
 
 
 class TestPodGenerator(unittest.TestCase):
 
     def setUp(self):
+        self.static_uuid = uuid.UUID('cf4a56d2-8101-4217-b027-2af6216feb48')
         self.envs = {
             'ENVIRONMENT': 'prod',
             'LOG_LEVEL': 'warning'
@@ -41,9 +43,23 @@ class TestPodGenerator(unittest.TestCase):
             # This should produce a single secret mounted in env
             Secret('env', 'TARGET', 'secret_b', 'source_b'),
         ]
+        self.labels = {
+            'airflow-worker': 'uuid',
+            'dag_id': 'dag_id',
+            'execution_date': 'date',
+            'task_id': 'task_id',
+            'try_number': '3',
+            'airflow_version': mock.ANY,
+            'kubernetes_executor': 'True'
+        }
+        self.metadata = {
+            'labels': self.labels,
+            'name': 'pod_id-' + self.static_uuid.hex,
+            'namespace': 'namespace'
+        }
+
         self.resources = Resources('1Gi', 1, '2Gi', 2, 1)
         self.k8s_client = ApiClient()
-        self.static_uuid = uuid.UUID('cf4a56d2-8101-4217-b027-2af6216feb48')
         self.expected = {
             'apiVersion': 'v1',
             'kind': 'Pod',
@@ -171,9 +187,9 @@ class TestPodGenerator(unittest.TestCase):
                 fs_group=2000,
             ),
             ports=[k8s.V1ContainerPort(name='foo', container_port=1234)],
-            configmaps=['configmap_a', 'configmap_b']
+            configmaps=['configmap_a', 'configmap_b'],
+            extract_xcom=True
         )
-        pod_generator.extract_xcom = True
         result = pod_generator.gen_pod()
         result = append_to_pod(result, self.secrets)
         result = self.resources.attach_to_pod(result)
@@ -253,79 +269,452 @@ class TestPodGenerator(unittest.TestCase):
             }
         }, result)
 
-    def test_reconcile_pods(self):
-        with mock.patch('uuid.uuid4') as mock_uuid:
-            mock_uuid.return_value = self.static_uuid
-            base_pod = PodGenerator(
-                image='image1',
-                name='name1',
-                envs={'key1': 'val1'},
-                cmds=['/bin/command1.sh', 'arg1'],
-                ports=k8s.V1ContainerPort(name='port', container_port=2118),
-                volumes=[{
-                    'hostPath': {'path': '/tmp/'},
-                    'name': 'example-kubernetes-test-volume1'
-                }],
-                volume_mounts=[{
-                    'mountPath': '/foo/',
-                    'name': 'example-kubernetes-test-volume1'
-                }],
-            ).gen_pod()
-
-            mutator_pod = PodGenerator(
-                envs={'key2': 'val2'},
-                image='',
-                name='name2',
-                cmds=['/bin/command2.sh', 'arg2'],
-                volumes=[{
-                    'hostPath': {'path': '/tmp/'},
-                    'name': 'example-kubernetes-test-volume2'
-                }],
-                volume_mounts=[{
-                    'mountPath': '/foo/',
-                    'name': 'example-kubernetes-test-volume2'
-                }]
-            ).gen_pod()
-
-            result = PodGenerator.reconcile_pods(base_pod, mutator_pod)
-            result = self.k8s_client.sanitize_for_serialization(result)
-            self.assertEqual(result, {
-                'apiVersion': 'v1',
-                'kind': 'Pod',
-                'metadata': {'name': 'name2-' + self.static_uuid.hex},
-                'spec': {
-                    'containers': [{
-                        'args': [],
-                        'command': ['/bin/command1.sh', 'arg1'],
-                        'env': [
-                            {'name': 'key1', 'value': 'val1'},
-                            {'name': 'key2', 'value': 'val2'}
-                        ],
-                        'envFrom': [],
-                        'image': 'image1',
-                        'imagePullPolicy': 'IfNotPresent',
-                        'name': 'base',
-                        'ports': {
-                            'containerPort': 2118,
-                            'name': 'port',
-                        },
-                        'volumeMounts': [{
-                            'mountPath': '/foo/',
-                            'name': 'example-kubernetes-test-volume1'
-                        }, {
-                            'mountPath': '/foo/',
-                            'name': 'example-kubernetes-test-volume2'
-                        }]
+    @mock.patch('uuid.uuid4')
+    def test_reconcile_pods_empty_mutator_pod(self, mock_uuid):
+        mock_uuid.return_value = self.static_uuid
+        base_pod = PodGenerator(
+            image='image1',
+            name='name1',
+            envs={'key1': 'val1'},
+            cmds=['/bin/command1.sh', 'arg1'],
+            ports=[k8s.V1ContainerPort(name='port', container_port=2118)],
+            volumes=[{
+                'hostPath': {'path': '/tmp/'},
+                'name': 'example-kubernetes-test-volume1'
+            }],
+            volume_mounts=[{
+                'mountPath': '/foo/',
+                'name': 'example-kubernetes-test-volume1'
+            }],
+        ).gen_pod()
+
+        mutator_pod = None
+        name = 'name1-' + self.static_uuid.hex
+
+        base_pod.metadata.name = name
+
+        result = PodGenerator.reconcile_pods(base_pod, mutator_pod)
+        self.assertEqual(base_pod, result)
+
+        mutator_pod = k8s.V1Pod()
+        result = PodGenerator.reconcile_pods(base_pod, mutator_pod)
+        self.assertEqual(base_pod, result)
+
+    @mock.patch('uuid.uuid4')
+    def test_reconcile_pods(self, mock_uuid):
+        mock_uuid.return_value = self.static_uuid
+        base_pod = PodGenerator(
+            image='image1',
+            name='name1',
+            envs={'key1': 'val1'},
+            cmds=['/bin/command1.sh', 'arg1'],
+            ports=[k8s.V1ContainerPort(name='port', container_port=2118)],
+            volumes=[{
+                'hostPath': {'path': '/tmp/'},
+                'name': 'example-kubernetes-test-volume1'
+            }],
+            volume_mounts=[{
+                'mountPath': '/foo/',
+                'name': 'example-kubernetes-test-volume1'
+            }],
+        ).gen_pod()
+
+        mutator_pod = PodGenerator(
+            envs={'key2': 'val2'},
+            image='',
+            name='name2',
+            cmds=['/bin/command2.sh', 'arg2'],
+            volumes=[{
+                'hostPath': {'path': '/tmp/'},
+                'name': 'example-kubernetes-test-volume2'
+            }],
+            volume_mounts=[{
+                'mountPath': '/foo/',
+                'name': 'example-kubernetes-test-volume2'
+            }]
+        ).gen_pod()
+
+        result = PodGenerator.reconcile_pods(base_pod, mutator_pod)
+        result = self.k8s_client.sanitize_for_serialization(result)
+        self.assertEqual(result, {
+            'apiVersion': 'v1',
+            'kind': 'Pod',
+            'metadata': {'name': 'name2-' + self.static_uuid.hex},
+            'spec': {
+                'containers': [{
+                    'args': [],
+                    'command': ['/bin/command2.sh', 'arg2'],
+                    'env': [
+                        {'name': 'key1', 'value': 'val1'},
+                        {'name': 'key2', 'value': 'val2'}
+                    ],
+                    'envFrom': [],
+                    'image': 'image1',
+                    'imagePullPolicy': 'IfNotPresent',
+                    'name': 'base',
+                    'ports': [{
+                        'containerPort': 2118,
+                        'name': 'port',
                     }],
-                    'hostNetwork': False,
-                    'imagePullSecrets': [],
-                    'restartPolicy': 'Never',
-                    'volumes': [{
-                        'hostPath': {'path': '/tmp/'},
+                    'volumeMounts': [{
+                        'mountPath': '/foo/',
                         'name': 'example-kubernetes-test-volume1'
                     }, {
-                        'hostPath': {'path': '/tmp/'},
+                        'mountPath': '/foo/',
                         'name': 'example-kubernetes-test-volume2'
                     }]
+                }],
+                'hostNetwork': False,
+                'imagePullSecrets': [],
+                'restartPolicy': 'Never',
+                'volumes': [{
+                    'hostPath': {'path': '/tmp/'},
+                    'name': 'example-kubernetes-test-volume1'
+                }, {
+                    'hostPath': {'path': '/tmp/'},
+                    'name': 'example-kubernetes-test-volume2'
+                }]
+            }
+        })
+
+    @mock.patch('uuid.uuid4')
+    def test_construct_pod_empty_worker_config(self, mock_uuid):
+        mock_uuid.return_value = self.static_uuid
+        executor_config = k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name='',
+                        resources=k8s.V1ResourceRequirements(
+                            limits={
+                                'cpu': '1m',
+                                'memory': '1G'
+                            }
+                        )
+                    )
+                ]
+            )
+        )
+        worker_config = k8s.V1Pod()
+
+        result = PodGenerator.construct_pod(
+            'dag_id',
+            'task_id',
+            'pod_id',
+            3,
+            'date',
+            ['command'],
+            executor_config,
+            worker_config,
+            'namespace',
+            'uuid',
+        )
+        sanitized_result = self.k8s_client.sanitize_for_serialization(result)
+
+        self.assertEqual({
+            'apiVersion': 'v1',
+            'kind': 'Pod',
+            'metadata': self.metadata,
+            'spec': {
+                'containers': [{
+                    'args': [],
+                    'command': ['command'],
+                    'env': [],
+                    'envFrom': [],
+                    'imagePullPolicy': 'IfNotPresent',
+                    'name': 'base',
+                    'ports': [],
+                    'resources': {
+                        'limits': {
+                            'cpu': '1m',
+                            'memory': '1G'
+                        }
+                    },
+                    'volumeMounts': []
+                }],
+                'hostNetwork': False,
+                'imagePullSecrets': [],
+                'restartPolicy': 'Never',
+                'volumes': []
+            }
+        }, sanitized_result)
+
+    @mock.patch('uuid.uuid4')
+    def test_construct_pod_empty_execuctor_config(self, mock_uuid):
+        mock_uuid.return_value = self.static_uuid
+        worker_config = k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name='',
+                        resources=k8s.V1ResourceRequirements(
+                            limits={
+                                'cpu': '1m',
+                                'memory': '1G'
+                            }
+                        )
+                    )
+                ]
+            )
+        )
+        executor_config = None
+
+        result = PodGenerator.construct_pod(
+            'dag_id',
+            'task_id',
+            'pod_id',
+            3,
+            'date',
+            ['command'],
+            executor_config,
+            worker_config,
+            'namespace',
+            'uuid',
+        )
+        sanitized_result = self.k8s_client.sanitize_for_serialization(result)
+
+        self.assertEqual({
+            'apiVersion': 'v1',
+            'kind': 'Pod',
+            'metadata': self.metadata,
+            'spec': {
+                'containers': [{
+                    'args': [],
+                    'command': ['command'],
+                    'env': [],
+                    'envFrom': [],
+                    'imagePullPolicy': 'IfNotPresent',
+                    'name': 'base',
+                    'ports': [],
+                    'resources': {
+                        'limits': {
+                            'cpu': '1m',
+                            'memory': '1G'
+                        }
+                    },
+                    'volumeMounts': []
+                }],
+                'hostNetwork': False,
+                'imagePullSecrets': [],
+                'restartPolicy': 'Never',
+                'volumes': []
+            }
+        }, sanitized_result)
+
+    @mock.patch('uuid.uuid4')
+    def test_construct_pod(self, mock_uuid):
+        mock_uuid.return_value = self.static_uuid
+        worker_config = k8s.V1Pod(
+            metadata=k8s.V1ObjectMeta(
+                name='gets-overridden-by-dynamic-args',
+                annotations={
+                    'should': 'stay'
                 }
-            })
+            ),
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name='doesnt-override',
+                        resources=k8s.V1ResourceRequirements(
+                            limits={
+                                'cpu': '1m',
+                                'memory': '1G'
+                            }
+                        ),
+                        security_context=k8s.V1SecurityContext(
+                            run_as_user=1
+                        )
+                    )
+                ]
+            )
+        )
+        executor_config = k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name='doesnt-override-either',
+                        resources=k8s.V1ResourceRequirements(
+                            limits={
+                                'cpu': '2m',
+                                'memory': '2G'
+                            }
+                        )
+                    )
+                ]
+            )
+        )
+
+        result = PodGenerator.construct_pod(
+            'dag_id',
+            'task_id',
+            'pod_id',
+            3,
+            'date',
+            ['command'],
+            executor_config,
+            worker_config,
+            'namespace',
+            'uuid',
+        )
+        sanitized_result = self.k8s_client.sanitize_for_serialization(result)
+
+        self.metadata.update({'annotations': {'should': 'stay'}})
+
+        self.assertEqual({
+            'apiVersion': 'v1',
+            'kind': 'Pod',
+            'metadata': self.metadata,
+            'spec': {
+                'containers': [{
+                    'args': [],
+                    'command': ['command'],
+                    'env': [],
+                    'envFrom': [],
+                    'imagePullPolicy': 'IfNotPresent',
+                    'name': 'base',
+                    'ports': [],
+                    'resources': {
+                        'limits': {
+                            'cpu': '2m',
+                            'memory': '2G'
+                        }
+                    },
+                    'volumeMounts': [],
+                    'securityContext': {'runAsUser': 1}
+                }],
+                'hostNetwork': False,
+                'imagePullSecrets': [],
+                'restartPolicy': 'Never',
+                'volumes': []
+            }
+        }, sanitized_result)
+
+    def test_merge_objects_empty(self):
+        annotations = {'foo1': 'bar1'}
+        base_obj = k8s.V1ObjectMeta(annotations=annotations)
+        client_obj = None
+        res = merge_objects(base_obj, client_obj)
+        self.assertEqual(base_obj, res)
+
+        client_obj = k8s.V1ObjectMeta()
+        res = merge_objects(base_obj, client_obj)
+        self.assertEqual(base_obj, res)
+
+        client_obj = k8s.V1ObjectMeta(annotations=annotations)
+        base_obj = None
+        res = merge_objects(base_obj, client_obj)
+        self.assertEqual(client_obj, res)
+
+        base_obj = k8s.V1ObjectMeta()
+        res = merge_objects(base_obj, client_obj)
+        self.assertEqual(client_obj, res)
+
+    def test_merge_objects(self):
+        base_annotations = {'foo1': 'bar1'}
+        base_labels = {'foo1': 'bar1'}
+        client_annotations = {'foo2': 'bar2'}
+        base_obj = k8s.V1ObjectMeta(
+            annotations=base_annotations,
+            labels=base_labels
+        )
+        client_obj = k8s.V1ObjectMeta(annotations=client_annotations)
+        res = merge_objects(base_obj, client_obj)
+        client_obj.labels = base_labels
+        self.assertEqual(client_obj, res)
+
+    def test_extend_object_field_empty(self):
+        ports = [k8s.V1ContainerPort(container_port=1, name='port')]
+        base_obj = k8s.V1Container(name='base_container', ports=ports)
+        client_obj = k8s.V1Container(name='client_container')
+        res = extend_object_field(base_obj, client_obj, 'ports')
+        client_obj.ports = ports
+        self.assertEqual(client_obj, res)
+
+        base_obj = k8s.V1Container(name='base_container')
+        client_obj = k8s.V1Container(name='base_container', ports=ports)
+        res = extend_object_field(base_obj, client_obj, 'ports')
+        self.assertEqual(client_obj, res)
+
+    def test_extend_object_field_not_list(self):
+        base_obj = k8s.V1Container(name='base_container', image='image')
+        client_obj = k8s.V1Container(name='client_container')
+        with self.assertRaises(ValueError):
+            extend_object_field(base_obj, client_obj, 'image')
+        base_obj = k8s.V1Container(name='base_container')
+        client_obj = k8s.V1Container(name='client_container', image='image')
+        with self.assertRaises(ValueError):
+            extend_object_field(base_obj, client_obj, 'image')
+
+    def test_extend_object_field(self):
+        base_ports = [k8s.V1ContainerPort(container_port=1, name='base_port')]
+        base_obj = k8s.V1Container(name='base_container', ports=base_ports)
+        client_ports = [k8s.V1ContainerPort(container_port=1, name='client_port')]
+        client_obj = k8s.V1Container(name='client_container', ports=client_ports)
+        res = extend_object_field(base_obj, client_obj, 'ports')
+        client_obj.ports = base_ports + client_ports
+        self.assertEqual(client_obj, res)
+
+    def test_reconcile_containers_empty(self):
+        base_objs = [k8s.V1Container(name='base_container')]
+        client_objs = []
+        res = PodGenerator.reconcile_containers(base_objs, client_objs)
+        self.assertEqual(base_objs, res)
+
+        client_objs = [k8s.V1Container(name='client_container')]
+        base_objs = []
+        res = PodGenerator.reconcile_containers(base_objs, client_objs)
+        self.assertEqual(client_objs, res)
+
+        res = PodGenerator.reconcile_containers([], [])
+        self.assertEqual(res, [])
+
+    def test_reconcile_containers(self):
+        base_ports = [k8s.V1ContainerPort(container_port=1, name='base_port')]
+        base_objs = [
+            k8s.V1Container(name='base_container1', ports=base_ports),
+            k8s.V1Container(name='base_container2', image='base_image'),
+        ]
+        client_ports = [k8s.V1ContainerPort(container_port=2, name='client_port')]
+        client_objs = [
+            k8s.V1Container(name='client_container1', ports=client_ports),
+            k8s.V1Container(name='client_container2', image='client_image'),
+        ]
+        res = PodGenerator.reconcile_containers(base_objs, client_objs)
+        client_objs[0].ports = base_ports + client_ports
+        self.assertEqual(client_objs, res)
+
+        base_ports = [k8s.V1ContainerPort(container_port=1, name='base_port')]
+        base_objs = [
+            k8s.V1Container(name='base_container1', ports=base_ports),
+            k8s.V1Container(name='base_container2', image='base_image'),
+        ]
+        client_ports = [k8s.V1ContainerPort(container_port=2, name='client_port')]
+        client_objs = [
+            k8s.V1Container(name='client_container1', ports=client_ports),
+            k8s.V1Container(name='client_container2', stdin=True),
+        ]
+        res = PodGenerator.reconcile_containers(base_objs, client_objs)
+        client_objs[0].ports = base_ports + client_ports
+        client_objs[1].image = 'base_image'
+        self.assertEqual(client_objs, res)
+
+    def test_reconcile_specs_empty(self):
+        base_spec = k8s.V1PodSpec(containers=[])
+        client_spec = None
+        res = PodGenerator.reconcile_specs(base_spec, client_spec)
+        self.assertEqual(base_spec, res)
+
+        base_spec = None
+        client_spec = k8s.V1PodSpec(containers=[])
+        res = PodGenerator.reconcile_specs(base_spec, client_spec)
+        self.assertEqual(client_spec, res)
+
+    def test_reconcile_specs(self):
+        base_objs = [k8s.V1Container(name='base_container1', image='base_image')]
+        client_objs = [k8s.V1Container(name='client_container1')]
+        base_spec = k8s.V1PodSpec(priority=1, active_deadline_seconds=100, containers=base_objs)
+        client_spec = k8s.V1PodSpec(priority=2, hostname='local', containers=client_objs)
+        res = PodGenerator.reconcile_specs(base_spec, client_spec)
+        client_spec.containers = [k8s.V1Container(name='client_container1', image='base_image')]
+        client_spec.active_deadline_seconds = 100
+        self.assertEqual(client_spec, res)
diff --git a/tests/kubernetes/test_worker_configuration.py b/tests/kubernetes/test_worker_configuration.py
index 8378f9f..74009a1 100644
--- a/tests/kubernetes/test_worker_configuration.py
+++ b/tests/kubernetes/test_worker_configuration.py
@@ -17,13 +17,12 @@
 #
 
 import unittest
-import uuid
-from datetime import datetime
 
 import six
 
 from tests.compat import mock
 from tests.test_utils.config import conf_vars
+
 try:
     from airflow.executors.kubernetes_executor import AirflowKubernetesScheduler
     from airflow.executors.kubernetes_executor import KubeConfig
@@ -31,6 +30,7 @@ try:
     from airflow.kubernetes.pod_generator import PodGenerator
     from airflow.exceptions import AirflowConfigException
     from airflow.kubernetes.secret import Secret
+    from airflow.version import version as airflow_version
     import kubernetes.client.models as k8s
     from kubernetes.client.api_client import ApiClient
 except ImportError:
@@ -74,6 +74,11 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
         }
     ]
 
+    worker_annotations_config = {
+        'iam.amazonaws.com/role': 'role-arn',
+        'other/annotation': 'value'
+    }
+
     def setUp(self):
         if AirflowKubernetesScheduler is None:
             self.skipTest("kubernetes python package is not installed")
@@ -312,11 +317,39 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
         self.kube_config.git_subpath = 'path'
 
         worker_config = WorkerConfiguration(self.kube_config)
-        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
-                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'")
+        pod = worker_config.as_pod()
 
         self.assertEqual(0, pod.spec.security_context.run_as_user)
 
+    def test_make_pod_assert_labels(self):
+        # Tests the pod created has all the expected labels set
+        self.kube_config.dags_folder = 'dags'
+
+        worker_config = WorkerConfiguration(self.kube_config)
+        pod = PodGenerator.construct_pod(
+            "test_dag_id",
+            "test_task_id",
+            "test_pod_id",
+            1,
+            "2019-11-21 11:08:22.920875",
+            ["bash -c 'ls /'"],
+            None,
+            worker_config.as_pod(),
+            "default",
+            "sample-uuid",
+
+        )
+        expected_labels = {
+            'airflow-worker': 'sample-uuid',
+            'airflow_version': airflow_version.replace('+', '-'),
+            'dag_id': 'test_dag_id',
+            'execution_date': '2019-11-21 11:08:22.920875',
+            'kubernetes_executor': 'True',
+            'task_id': 'test_task_id',
+            'try_number': '1'
+        }
+        self.assertEqual(pod.metadata.labels, expected_labels)
+
     def test_make_pod_git_sync_ssh_without_known_hosts(self):
         # Tests the pod created with git-sync SSH authentication option is correct without known hosts
         self.kube_config.airflow_configmap = 'airflow-configmap'
@@ -331,8 +364,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
 
         worker_config = WorkerConfiguration(self.kube_config)
 
-        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
-                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'")
+        pod = worker_config.as_pod()
 
         init_containers = worker_config._get_init_containers()
         git_ssh_key_file = next((x.value for x in init_containers[0].env
@@ -361,8 +393,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
 
         worker_config = WorkerConfiguration(self.kube_config)
 
-        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
-                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'")
+        pod = worker_config.as_pod()
 
         username_env = k8s.V1EnvVar(
             name='GIT_SYNC_USERNAME',
@@ -387,6 +418,29 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
         self.assertIn(password_env, pod.spec.init_containers[0].env,
                       'The password env for git credentials did not get into the init container')
 
+    def test_make_pod_git_sync_rev(self):
+        # Tests the pod created with git_sync_credentials_secret will get into the init container
+        self.kube_config.git_sync_rev = 'sampletag'
+        self.kube_config.dags_volume_claim = None
+        self.kube_config.dags_volume_host = None
+        self.kube_config.dags_in_image = None
+        self.kube_config.worker_fs_group = None
+        self.kube_config.git_dags_folder_mount_point = 'dags'
+        self.kube_config.git_sync_dest = 'repo'
+        self.kube_config.git_subpath = 'path'
+
+        worker_config = WorkerConfiguration(self.kube_config)
+
+        pod = worker_config.as_pod()
+
+        rev_env = k8s.V1EnvVar(
+            name='GIT_SYNC_REV',
+            value=self.kube_config.git_sync_rev,
+        )
+
+        self.assertIn(rev_env, pod.spec.init_containers[0].env,
+                      'The git_sync_rev env did not get into the init container')
+
     def test_make_pod_git_sync_ssh_with_known_hosts(self):
         # Tests the pod created with git-sync SSH authentication option is correct with known hosts
         self.kube_config.airflow_configmap = 'airflow-configmap'
@@ -415,11 +469,10 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
     def test_make_pod_with_empty_executor_config(self):
         self.kube_config.kube_affinity = self.affinity_config
         self.kube_config.kube_tolerations = self.tolerations_config
+        self.kube_config.kube_annotations = self.worker_annotations_config
         self.kube_config.dags_folder = 'dags'
         worker_config = WorkerConfiguration(self.kube_config)
-
-        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
-                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'")
+        pod = worker_config.as_pod()
 
         self.assertTrue(pod.spec.affinity['podAntiAffinity'] is not None)
         self.assertEqual('app',
@@ -431,6 +484,8 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
 
         self.assertEqual(2, len(pod.spec.tolerations))
         self.assertEqual('prod', pod.spec.tolerations[1]['key'])
+        self.assertEqual('role-arn', pod.metadata.annotations['iam.amazonaws.com/role'])
+        self.assertEqual('value', pod.metadata.annotations['other/annotation'])
 
     def test_make_pod_with_executor_config(self):
         self.kube_config.dags_folder = 'dags'
@@ -441,8 +496,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
             tolerations=self.tolerations_config,
         ).gen_pod()
 
-        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
-                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'")
+        pod = worker_config.as_pod()
 
         result = PodGenerator.reconcile_pods(pod, config_pod)
 
@@ -607,3 +661,18 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
             'dag_id': 'override_dag_id',
             'my_kube_executor_label': 'kubernetes'
         }, labels)
+
+    def test_make_pod_with_image_pull_secrets(self):
+        # Tests the pod created with image_pull_secrets actually gets that in it's config
+        self.kube_config.dags_volume_claim = None
+        self.kube_config.dags_volume_host = None
+        self.kube_config.dags_in_image = None
+        self.kube_config.git_dags_folder_mount_point = 'dags'
+        self.kube_config.git_sync_dest = 'repo'
+        self.kube_config.git_subpath = 'path'
+        self.kube_config.image_pull_secrets = 'image_pull_secret1,image_pull_secret2'
+
+        worker_config = WorkerConfiguration(self.kube_config)
+        pod = worker_config.as_pod()
+
+        self.assertEqual(2, len(pod.spec.image_pull_secrets))