You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/06/25 18:17:12 UTC

[airflow] branch v1-10-test updated (480989c -> 0bdf7d2)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a change to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


 discard 480989c  [AIRFLOW-5413] Refactor worker config (#7114)
     new 3f05ec2  Filter dags by clicking on tag (#8897)
     new ec3038e  Allow passing backend_kwargs to AWS SSM client (#8802)
     new 0bdf7d2  [AIRFLOW-5413] Refactor worker config (#7114)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (480989c)
            \
             N -- N -- N   refs/heads/v1-10-test (0bdf7d2)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 airflow/contrib/secrets/aws_systems_manager.py    | 13 ++++++++++---
 airflow/www_rbac/templates/airflow/dags.html      |  6 +++++-
 tests/contrib/secrets/test_aws_systems_manager.py | 18 ++++++++++++++++++
 tests/secrets/test_secrets.py                     | 14 ++++++++++++++
 4 files changed, 47 insertions(+), 4 deletions(-)


[airflow] 01/03: Filter dags by clicking on tag (#8897)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 3f05ec27daf9aac2e73ddebc26910044ab3e09d4
Author: sangarshanan <sa...@gmail.com>
AuthorDate: Wed May 27 16:42:33 2020 +0530

    Filter dags by clicking on tag (#8897)
    
    Co-authored-by: Zacharya <za...@gmail.com>
    
    (cherry-picked from 30b12a9)
---
 airflow/www_rbac/templates/airflow/dags.html | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/airflow/www_rbac/templates/airflow/dags.html b/airflow/www_rbac/templates/airflow/dags.html
index f93b65b..f2f131c 100644
--- a/airflow/www_rbac/templates/airflow/dags.html
+++ b/airflow/www_rbac/templates/airflow/dags.html
@@ -110,7 +110,11 @@
 
               <div style="float: right; max-width: 70%; text-align: right; line-height: 160%;">
                 {% for tag in dag.tags | sort(attribute='name') %}
-                  <span class="label label-success" style="margin: 3px;">{{ tag.name }}</span>
+                  <a class="label label-success"
+                     href="?tags={{ tag.name }}"
+                     style="margin: 3px;">
+                     {{ tag.name }}
+                  </a>
                 {% endfor %}
               </div>
             </td>


[airflow] 03/03: [AIRFLOW-5413] Refactor worker config (#7114)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 0bdf7d27582c034b494d9d6ca9f9b1711ae681e7
Author: davlum <da...@gmail.com>
AuthorDate: Thu Jan 9 15:39:05 2020 -0500

    [AIRFLOW-5413] Refactor worker config (#7114)
    
    (cherry picked from commit 51f262c65afd7eaecc54661a3b5c4e533feecff8)
---
 .github/workflows/ci.yml                           |   2 +-
 .../contrib/operators/kubernetes_pod_operator.py   |   4 +-
 airflow/executors/kubernetes_executor.py           |  11 +-
 airflow/kubernetes/pod_generator.py                | 256 ++++++++--
 airflow/kubernetes/worker_configuration.py         |  18 +-
 tests/executors/test_kubernetes_executor.py        |  96 +++-
 tests/kubernetes/test_pod_generator.py             | 541 ++++++++++++++++++---
 tests/kubernetes/test_worker_configuration.py      |  95 +++-
 8 files changed, 833 insertions(+), 190 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index fb16aaf..67b9e50 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -144,7 +144,7 @@ jobs:
       - name: Cache virtualenv for kubernetes testing
         uses: actions/cache@v2
         env:
-          cache-name: cache-kubernetes-tests-virtualenv-v2
+          cache-name: cache-kubernetes-tests-virtualenv-v3
         with:
           path: .build/.kubernetes_venv
           key: "${{ env.cache-name }}-${{ github.job }}-\
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py
index 8adb131..d439eda 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -367,11 +367,11 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
                 pod,
                 startup_timeout=self.startup_timeout_seconds)
             final_state, result = launcher.monitor_pod(pod=pod, get_logs=self.get_logs)
-        except AirflowException:
+        except AirflowException as ex:
             if self.log_events_on_failure:
                 for event in launcher.read_pod_events(pod).items:
                     self.log.error("Pod Event: %s - %s", event.reason, event.message)
-            raise
+            raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
         finally:
             if self.is_delete_operator_pod:
                 launcher.delete_pod(pod)
diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index d458d7a..1048400 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -357,7 +357,7 @@ class AirflowKubernetesScheduler(LoggingMixin):
         self.log.debug("Kubernetes using namespace %s", self.namespace)
         self.kube_client = kube_client
         self.launcher = PodLauncher(kube_client=self.kube_client)
-        self.worker_configuration = WorkerConfiguration(kube_config=self.kube_config)
+        self.worker_configuration_pod = WorkerConfiguration(kube_config=self.kube_config).as_pod()
         self._manager = multiprocessing.Manager()
         self.watcher_queue = self._manager.Queue()
         self.worker_uuid = worker_uuid
@@ -393,19 +393,20 @@ class AirflowKubernetesScheduler(LoggingMixin):
         if command[0:2] != ["airflow", "run"]:
             raise ValueError('The command must start with ["airflow", "run"].')
 
-        config_pod = self.worker_configuration.make_pod(
+        pod = PodGenerator.construct_pod(
             namespace=self.namespace,
             worker_uuid=self.worker_uuid,
             pod_id=self._create_pod_id(dag_id, task_id),
             dag_id=pod_generator.make_safe_label_value(dag_id),
             task_id=pod_generator.make_safe_label_value(task_id),
             try_number=try_number,
-            execution_date=self._datetime_to_label_safe_datestring(execution_date),
-            airflow_command=command
+            date=self._datetime_to_label_safe_datestring(execution_date),
+            command=command,
+            kube_executor_config=kube_executor_config,
+            worker_config=self.worker_configuration_pod
         )
         # Reconcile the pod generated by the Operator and the Pod
         # generated by the .cfg file
-        pod = PodGenerator.reconcile_pods(config_pod, kube_executor_config)
         self.log.debug("Kubernetes running for command %s", command)
         self.log.debug("Kubernetes launching image %s", pod.spec.containers[0].image)
 
diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index a614f41..aead51b 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -28,7 +28,7 @@ import uuid
 
 import kubernetes.client.models as k8s
 
-from airflow.executors import Executors
+from airflow.version import version as airflow_version
 
 MAX_LABEL_LEN = 63
 
@@ -87,28 +87,59 @@ class PodGenerator:
     Contains Kubernetes Airflow Worker configuration logic
 
     Represents a kubernetes pod and manages execution of a single pod.
+    Any configuration that is container specific gets applied to
+    the first container in the list of containers.
+
+    Parameters with a type of `kubernetes.client.models.*`/`k8s.*` can
+    often be replaced with their dictionary equivalent, for example the output of
+    `sanitize_for_serialization`.
+
     :param image: The docker image
-    :type image: str
+    :type image: Optional[str]
+    :param name: name in the metadata section (not the container name)
+    :type name: Optional[str]
+    :param namespace: pod namespace
+    :type namespace: Optional[str]
+    :param volume_mounts: list of kubernetes volumes mounts
+    :type volume_mounts: Optional[List[Union[k8s.V1VolumeMount, dict]]]
     :param envs: A dict containing the environment variables
-    :type envs: Dict[str, str]
-    :param cmds: The command to be run on the pod
-    :type cmds: List[str]
-    :param secrets: Secrets to be launched to the pod
-    :type secrets: List[airflow.kubernetes.models.secret.Secret]
+    :type envs: Optional[Dict[str, str]]
+    :param cmds: The command to be run on the first container
+    :type cmds: Optional[List[str]]
+    :param args: The arguments to be run on the pod
+    :type args: Optional[List[str]]
+    :param labels: labels for the pod metadata
+    :type labels: Optional[Dict[str, str]]
+    :param node_selectors: node selectors for the pod
+    :type node_selectors: Optional[Dict[str, str]]
+    :param ports: list of ports. Applies to the first container.
+    :type ports: Optional[List[Union[k8s.V1ContainerPort, dict]]]
+    :param volumes: Volumes to be attached to the first container
+    :type volumes: Optional[List[Union[k8s.V1Volume, dict]]]
     :param image_pull_policy: Specify a policy to cache or always pull an image
     :type image_pull_policy: str
+    :param restart_policy: The restart policy of the pod
+    :type restart_policy: str
     :param image_pull_secrets: Any image pull secrets to be given to the pod.
         If more than one secret is required, provide a comma separated list:
         secret_a,secret_b
     :type image_pull_secrets: str
+    :param init_containers: A list of init containers
+    :type init_containers: Optional[List[k8s.V1Container]]
+    :param service_account_name: Identity for processes that run in a Pod
+    :type service_account_name: Optional[str]
+    :param resources: Resource requirements for the first containers
+    :type resources: Optional[Union[k8s.V1ResourceRequirements, dict]]
+    :param annotations: annotations for the pod
+    :type annotations: Optional[Dict[str, str]]
     :param affinity: A dict containing a group of affinity scheduling rules
-    :type affinity: dict
+    :type affinity: Optional[dict]
     :param hostnetwork: If True enable host networking on the pod
     :type hostnetwork: bool
     :param tolerations: A list of kubernetes tolerations
-    :type tolerations: list
+    :type tolerations: Optional[list]
     :param security_context: A dict containing the security context for the pod
-    :type security_context: dict
+    :type security_context: Optional[Union[k8s.V1PodSecurityContext, dict]]
     :param configmaps: Any configmap refs to envfrom.
         If more than one configmap is required, provide a comma separated list
         configmap_a,configmap_b
@@ -117,6 +148,8 @@ class PodGenerator:
     :type dnspolicy: str
     :param pod: The fully specified pod.
     :type pod: kubernetes.client.models.V1Pod
+    :param extract_xcom: Whether to bring up a container for xcom
+    :type extract_xcom: bool
     """
 
     def __init__(
@@ -225,10 +258,11 @@ class PodGenerator:
             result.metadata = self.metadata
             result.spec.containers = [self.container]
 
+        result.metadata.name = self.make_unique_pod_id(result.metadata.name)
+
         if self.extract_xcom:
             result = self.add_sidecar(result)
 
-        result.metadata.name = self.make_unique_pod_id(result.metadata.name)
         return result
 
     @staticmethod
@@ -252,8 +286,9 @@ class PodGenerator:
     @staticmethod
     def add_sidecar(pod):
         pod_cp = copy.deepcopy(pod)
-
+        pod_cp.spec.volumes = pod.spec.volumes or []
         pod_cp.spec.volumes.insert(0, PodDefaults.VOLUME)
+        pod_cp.spec.containers[0].volume_mounts = pod_cp.spec.containers[0].volume_mounts or []
         pod_cp.spec.containers[0].volume_mounts.insert(0, PodDefaults.VOLUME_MOUNT)
         pod_cp.spec.containers.append(PodDefaults.SIDECAR_CONTAINER)
 
@@ -262,7 +297,7 @@ class PodGenerator:
     @staticmethod
     def from_obj(obj):
         if obj is None:
-            return k8s.V1Pod()
+            return None
 
         if isinstance(obj, PodGenerator):
             return obj.gen_pod()
@@ -272,7 +307,12 @@ class PodGenerator:
                 'Cannot convert a non-dictionary or non-PodGenerator '
                 'object into a KubernetesExecutorConfig')
 
-        namespaced = obj.get(Executors.KubernetesExecutor, {})
+        # We do not want to extract constant here from ExecutorLoader because it is just
+        # A name in dictionary rather than executor selection mechanism and it causes cyclic import
+        namespaced = obj.get("KubernetesExecutor", {})
+
+        if not namespaced:
+            return None
 
         resources = namespaced.get('resources')
 
@@ -348,46 +388,158 @@ class PodGenerator:
         should be preserved from base, the volumes appended to and
         the other fields overwritten.
         """
+        if client_pod is None:
+            return base_pod
 
         client_pod_cp = copy.deepcopy(client_pod)
+        client_pod_cp.spec = PodGenerator.reconcile_specs(base_pod.spec, client_pod_cp.spec)
 
-        def merge_objects(base_obj, client_obj):
-            for base_key in base_obj.to_dict().keys():
-                base_val = getattr(base_obj, base_key, None)
-                if not getattr(client_obj, base_key, None) and base_val:
-                    setattr(client_obj, base_key, base_val)
-
-        def extend_object_field(base_obj, client_obj, field_name):
-            base_obj_field = getattr(base_obj, field_name, None)
-            client_obj_field = getattr(client_obj, field_name, None)
-            if not base_obj_field:
-                return
-            if not client_obj_field:
-                setattr(client_obj, field_name, base_obj_field)
-                return
-            appended_fields = base_obj_field + client_obj_field
-            setattr(client_obj, field_name, appended_fields)
-
-        # Values at the pod and metadata should be overwritten where they exist,
-        # but certain values at the spec and container level must be conserved.
-        base_container = base_pod.spec.containers[0]
-        client_container = client_pod_cp.spec.containers[0]
-
-        extend_object_field(base_container, client_container, 'volume_mounts')
-        extend_object_field(base_container, client_container, 'env')
-        extend_object_field(base_container, client_container, 'env_from')
-        extend_object_field(base_container, client_container, 'ports')
-        extend_object_field(base_container, client_container, 'volume_devices')
-        client_container.command = base_container.command
-        client_container.args = base_container.args
-        merge_objects(base_pod.spec.containers[0], client_pod_cp.spec.containers[0])
-        # Just append any additional containers from the base pod
-        client_pod_cp.spec.containers.extend(base_pod.spec.containers[1:])
-
-        merge_objects(base_pod.metadata, client_pod_cp.metadata)
-
-        extend_object_field(base_pod.spec, client_pod_cp.spec, 'volumes')
-        merge_objects(base_pod.spec, client_pod_cp.spec)
-        merge_objects(base_pod, client_pod_cp)
+        client_pod_cp.metadata = merge_objects(base_pod.metadata, client_pod_cp.metadata)
+        client_pod_cp = merge_objects(base_pod, client_pod_cp)
 
         return client_pod_cp
+
+    @staticmethod
+    def reconcile_specs(base_spec,
+                        client_spec):
+        """
+        :param base_spec: has the base attributes which are overwritten if they exist
+            in the client_spec and remain if they do not exist in the client_spec
+        :type base_spec: k8s.V1PodSpec
+        :param client_spec: the spec that the client wants to create.
+        :type client_spec: k8s.V1PodSpec
+        :return: the merged specs
+        """
+        if base_spec and not client_spec:
+            return base_spec
+        if not base_spec and client_spec:
+            return client_spec
+        elif client_spec and base_spec:
+            client_spec.containers = PodGenerator.reconcile_containers(
+                base_spec.containers, client_spec.containers
+            )
+            merged_spec = extend_object_field(base_spec, client_spec, 'volumes')
+            return merge_objects(base_spec, merged_spec)
+
+        return None
+
+    @staticmethod
+    def reconcile_containers(base_containers,
+                             client_containers):
+        """
+        :param base_containers: has the base attributes which are overwritten if they exist
+            in the client_containers and remain if they do not exist in the client_containers
+        :type base_containers: List[k8s.V1Container]
+        :param client_containers: the containers that the client wants to create.
+        :type client_containers: List[k8s.V1Container]
+        :return: the merged containers
+
+        The runs recursively over the list of containers.
+        """
+        if not base_containers:
+            return client_containers
+        if not client_containers:
+            return base_containers
+
+        client_container = client_containers[0]
+        base_container = base_containers[0]
+        client_container = extend_object_field(base_container, client_container, 'volume_mounts')
+        client_container = extend_object_field(base_container, client_container, 'env')
+        client_container = extend_object_field(base_container, client_container, 'env_from')
+        client_container = extend_object_field(base_container, client_container, 'ports')
+        client_container = extend_object_field(base_container, client_container, 'volume_devices')
+        client_container = merge_objects(base_container, client_container)
+
+        return [client_container] + PodGenerator.reconcile_containers(
+            base_containers[1:], client_containers[1:]
+        )
+
+    @staticmethod
+    def construct_pod(
+        dag_id,
+        task_id,
+        pod_id,
+        try_number,
+        date,
+        command,
+        kube_executor_config,
+        worker_config,
+        namespace,
+        worker_uuid
+    ):
+        """
+        Construct a pod by gathering and consolidating the configuration from 3 places:
+            - airflow.cfg
+            - executor_config
+            - dynamic arguments
+        """
+        dynamic_pod = PodGenerator(
+            namespace=namespace,
+            labels={
+                'airflow-worker': worker_uuid,
+                'dag_id': dag_id,
+                'task_id': task_id,
+                'execution_date': date,
+                'try_number': str(try_number),
+                'airflow_version': airflow_version.replace('+', '-'),
+                'kubernetes_executor': 'True',
+            },
+            cmds=command,
+            name=pod_id
+        ).gen_pod()
+
+        # Reconcile the pod generated by the Operator and the Pod
+        # generated by the .cfg file
+        pod_with_executor_config = PodGenerator.reconcile_pods(worker_config,
+                                                               kube_executor_config)
+        # Reconcile that pod with the dynamic fields.
+        return PodGenerator.reconcile_pods(pod_with_executor_config, dynamic_pod)
+
+
+def merge_objects(base_obj, client_obj):
+    """
+    :param base_obj: has the base attributes which are overwritten if they exist
+        in the client_obj and remain if they do not exist in the client_obj
+    :param client_obj: the object that the client wants to create.
+    :return: the merged objects
+    """
+    if not base_obj:
+        return client_obj
+    if not client_obj:
+        return base_obj
+
+    client_obj_cp = copy.deepcopy(client_obj)
+
+    for base_key in base_obj.to_dict().keys():
+        base_val = getattr(base_obj, base_key, None)
+        if not getattr(client_obj, base_key, None) and base_val:
+            setattr(client_obj_cp, base_key, base_val)
+    return client_obj_cp
+
+
+def extend_object_field(base_obj, client_obj, field_name):
+    """
+    :param base_obj: an object which has a property `field_name` that is a list
+    :param client_obj: an object which has a property `field_name` that is a list.
+        A copy of this object is returned with `field_name` modified
+    :param field_name: the name of the list field
+    :type field_name: str
+    :return: the client_obj with the property `field_name` being the two properties appended
+    """
+    client_obj_cp = copy.deepcopy(client_obj)
+    base_obj_field = getattr(base_obj, field_name, None)
+    client_obj_field = getattr(client_obj, field_name, None)
+
+    if (not isinstance(base_obj_field, list) and base_obj_field is not None) or \
+       (not isinstance(client_obj_field, list) and client_obj_field is not None):
+        raise ValueError("The chosen field must be a list.")
+
+    if not base_obj_field:
+        return client_obj_cp
+    if not client_obj_field:
+        setattr(client_obj_cp, field_name, base_obj_field)
+        return client_obj_cp
+
+    appended_fields = base_obj_field + client_obj_field
+    setattr(client_obj_cp, field_name, appended_fields)
+    return client_obj_cp
diff --git a/airflow/kubernetes/worker_configuration.py b/airflow/kubernetes/worker_configuration.py
index bed1ac2..3464e81 100644
--- a/airflow/kubernetes/worker_configuration.py
+++ b/airflow/kubernetes/worker_configuration.py
@@ -25,7 +25,6 @@ from airflow.kubernetes.k8s_model import append_to_pod
 from airflow.kubernetes.pod_generator import PodGenerator
 from airflow.kubernetes.secret import Secret
 from airflow.utils.log.logging_mixin import LoggingMixin
-from airflow.version import version as airflow_version
 
 
 class WorkerConfiguration(LoggingMixin):
@@ -418,23 +417,12 @@ class WorkerConfiguration(LoggingMixin):
 
         return self.kube_config.git_dags_folder_mount_point
 
-    def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_date,
-                 try_number, airflow_command):
+    def as_pod(self):
+        """Creates POD."""
         pod_generator = PodGenerator(
-            namespace=namespace,
-            name=pod_id,
             image=self.kube_config.kube_image,
             image_pull_policy=self.kube_config.kube_image_pull_policy,
-            labels={
-                'airflow-worker': worker_uuid,
-                'dag_id': dag_id,
-                'task_id': task_id,
-                'execution_date': execution_date,
-                'try_number': str(try_number),
-                'airflow_version': airflow_version.replace('+', '-'),
-                'kubernetes_executor': 'True',
-            },
-            cmds=airflow_command,
+            image_pull_secrets=self.kube_config.image_pull_secrets,
             volumes=self._get_volumes(),
             volume_mounts=self._get_volume_mounts(),
             init_containers=self._get_init_containers(),
diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index 993c47a..3b83d0a 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -26,12 +26,12 @@ from urllib3 import HTTPResponse
 
 from airflow.utils import timezone
 from tests.compat import mock
-
+from tests.test_utils.config import conf_vars
 try:
     from kubernetes.client.rest import ApiException
     from airflow import configuration  # noqa: F401
     from airflow.configuration import conf  # noqa: F401
-    from airflow.executors.kubernetes_executor import AirflowKubernetesScheduler
+    from airflow.executors.kubernetes_executor import AirflowKubernetesScheduler, KubeConfig
     from airflow.executors.kubernetes_executor import KubernetesExecutor
     from airflow.kubernetes import pod_generator
     from airflow.kubernetes.pod_generator import PodGenerator
@@ -124,6 +124,55 @@ class TestAirflowKubernetesScheduler(unittest.TestCase):
         self.assertEqual(datetime_obj, new_datetime_obj)
 
 
+class TestKubeConfig(unittest.TestCase):
+    def setUp(self):
+        if AirflowKubernetesScheduler is None:
+            self.skipTest("kubernetes python package is not installed")
+
+    @conf_vars({
+        ('kubernetes', 'git_ssh_known_hosts_configmap_name'): 'airflow-configmap',
+        ('kubernetes', 'git_ssh_key_secret_name'): 'airflow-secrets',
+        ('kubernetes', 'worker_annotations'): '{ "iam.com/role" : "role-arn", "other/annotation" : "value" }'
+    })
+    def test_kube_config_worker_annotations_properly_parsed(self):
+        annotations = KubeConfig().kube_annotations
+        self.assertEqual({'iam.com/role': 'role-arn', 'other/annotation': 'value'}, annotations)
+
+    @conf_vars({
+        ('kubernetes', 'git_ssh_known_hosts_configmap_name'): 'airflow-configmap',
+        ('kubernetes', 'git_ssh_key_secret_name'): 'airflow-secrets'
+    })
+    def test_kube_config_no_worker_annotations(self):
+        annotations = KubeConfig().kube_annotations
+        self.assertIsNone(annotations)
+
+    @conf_vars({
+        ('kubernetes', 'git_repo'): 'foo',
+        ('kubernetes', 'git_branch'): 'foo',
+        ('kubernetes', 'git_dags_folder_mount_point'): 'foo',
+        ('kubernetes', 'git_sync_run_as_user'): '0',
+    })
+    def test_kube_config_git_sync_run_as_user_root(self):
+        self.assertEqual(KubeConfig().git_sync_run_as_user, 0)
+
+    @conf_vars({
+        ('kubernetes', 'git_repo'): 'foo',
+        ('kubernetes', 'git_branch'): 'foo',
+        ('kubernetes', 'git_dags_folder_mount_point'): 'foo',
+    })
+    def test_kube_config_git_sync_run_as_user_not_present(self):
+        self.assertEqual(KubeConfig().git_sync_run_as_user, 65533)
+
+    @conf_vars({
+        ('kubernetes', 'git_repo'): 'foo',
+        ('kubernetes', 'git_branch'): 'foo',
+        ('kubernetes', 'git_dags_folder_mount_point'): 'foo',
+        ('kubernetes', 'git_sync_run_as_user'): '',
+    })
+    def test_kube_config_git_sync_run_as_user_empty_string(self):
+        self.assertEqual(KubeConfig().git_sync_run_as_user, '')
+
+
 class TestKubernetesExecutor(unittest.TestCase):
     """
     Tests if an ApiException from the Kube Client will cause the task to
@@ -136,44 +185,45 @@ class TestKubernetesExecutor(unittest.TestCase):
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
     def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watcher):
         # When a quota is exceeded this is the ApiException we get
-        r = HTTPResponse(
+        response = HTTPResponse(
             body='{"kind": "Status", "apiVersion": "v1", "metadata": {}, "status": "Failure", '
                  '"message": "pods \\"podname\\" is forbidden: exceeded quota: compute-resources, '
                  'requested: limits.memory=4Gi, used: limits.memory=6508Mi, limited: limits.memory=10Gi", '
                  '"reason": "Forbidden", "details": {"name": "podname", "kind": "pods"}, "code": 403}')
-        r.status = 403
-        r.reason = "Forbidden"
+        response.status = 403
+        response.reason = "Forbidden"
 
         # A mock kube_client that throws errors when making a pod
         mock_kube_client = mock.patch('kubernetes.client.CoreV1Api', autospec=True)
         mock_kube_client.create_namespaced_pod = mock.MagicMock(
-            side_effect=ApiException(http_resp=r))
+            side_effect=ApiException(http_resp=response))
         mock_get_kube_client.return_value = mock_kube_client
         mock_api_client = mock.MagicMock()
         mock_api_client.sanitize_for_serialization.return_value = {}
         mock_kube_client.api_client = mock_api_client
 
-        kubernetesExecutor = KubernetesExecutor()
-        kubernetesExecutor.start()
+        kubernetes_executor = KubernetesExecutor()
+        kubernetes_executor.start()
 
         # Execute a task while the Api Throws errors
         try_number = 1
-        kubernetesExecutor.execute_async(key=('dag', 'task', datetime.utcnow(), try_number),
-                                         command=['airflow', 'run', 'true', 'some_parameter'],
-                                         executor_config={})
-        kubernetesExecutor.sync()
-        kubernetesExecutor.sync()
+        kubernetes_executor.execute_async(key=('dag', 'task', datetime.utcnow(), try_number),
+                                          queue=None,
+                                          command='command',
+                                          executor_config={})
+        kubernetes_executor.sync()
+        kubernetes_executor.sync()
 
         assert mock_kube_client.create_namespaced_pod.called
-        self.assertFalse(kubernetesExecutor.task_queue.empty())
+        self.assertFalse(kubernetes_executor.task_queue.empty())
 
         # Disable the ApiException
         mock_kube_client.create_namespaced_pod.side_effect = None
 
         # Execute the task without errors should empty the queue
-        kubernetesExecutor.sync()
+        kubernetes_executor.sync()
         assert mock_kube_client.create_namespaced_pod.called
-        self.assertTrue(kubernetesExecutor.task_queue.empty())
+        self.assertTrue(kubernetes_executor.task_queue.empty())
 
     @mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor.sync')
@@ -187,22 +237,19 @@ class TestKubernetesExecutor(unittest.TestCase):
                  mock.call('executor.running_tasks', mock.ANY)]
         mock_stats_gauge.assert_has_calls(calls)
 
-    @mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
-    def test_change_state_running(self, mock_get_kube_client, mock_kubernetes_job_watcher, mock_kube_config):
+    def test_change_state_running(self, mock_get_kube_client, mock_kubernetes_job_watcher):
         executor = KubernetesExecutor()
         executor.start()
         key = ('dag_id', 'task_id', 'ex_time', 'try_number1')
         executor._change_state(key, State.RUNNING, 'pod_id', 'default')
         self.assertTrue(executor.event_buffer[key] == State.RUNNING)
 
-    @mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
     @mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
-    def test_change_state_success(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher,
-                                  mock_kube_config):
+    def test_change_state_success(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher):
         executor = KubernetesExecutor()
         executor.start()
         test_time = timezone.utcnow()
@@ -211,12 +258,10 @@ class TestKubernetesExecutor(unittest.TestCase):
         self.assertTrue(executor.event_buffer[key] == State.SUCCESS)
         mock_delete_pod.assert_called_once_with('pod_id', 'default')
 
-    @mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
     @mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
-    def test_change_state_failed(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher,
-                                 mock_kube_config):
+    def test_change_state_failed(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher):
         executor = KubernetesExecutor()
         executor.kube_config.delete_worker_pods = False
         executor.kube_config.delete_worker_pods_on_failure = False
@@ -227,12 +272,11 @@ class TestKubernetesExecutor(unittest.TestCase):
         self.assertTrue(executor.event_buffer[key] == State.FAILED)
         mock_delete_pod.assert_not_called()
 
-    @mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
     @mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
     def test_change_state_skip_pod_deletion(self, mock_delete_pod, mock_get_kube_client,
-                                            mock_kubernetes_job_watcher, mock_kube_config):
+                                            mock_kubernetes_job_watcher):
         test_time = timezone.utcnow()
         executor = KubernetesExecutor()
         executor.kube_config.delete_worker_pods = False
diff --git a/tests/kubernetes/test_pod_generator.py b/tests/kubernetes/test_pod_generator.py
index 30839e7..a9a3aa5 100644
--- a/tests/kubernetes/test_pod_generator.py
+++ b/tests/kubernetes/test_pod_generator.py
@@ -20,15 +20,17 @@ from tests.compat import mock
 import uuid
 import kubernetes.client.models as k8s
 from kubernetes.client import ApiClient
-from airflow.kubernetes.secret import Secret
-from airflow.kubernetes.pod_generator import PodGenerator, PodDefaults
-from airflow.kubernetes.pod import Resources
+
 from airflow.kubernetes.k8s_model import append_to_pod
+from airflow.kubernetes.pod import Resources
+from airflow.kubernetes.pod_generator import PodDefaults, PodGenerator, extend_object_field, merge_objects
+from airflow.kubernetes.secret import Secret
 
 
 class TestPodGenerator(unittest.TestCase):
 
     def setUp(self):
+        self.static_uuid = uuid.UUID('cf4a56d2-8101-4217-b027-2af6216feb48')
         self.envs = {
             'ENVIRONMENT': 'prod',
             'LOG_LEVEL': 'warning'
@@ -41,9 +43,23 @@ class TestPodGenerator(unittest.TestCase):
             # This should produce a single secret mounted in env
             Secret('env', 'TARGET', 'secret_b', 'source_b'),
         ]
+        self.labels = {
+            'airflow-worker': 'uuid',
+            'dag_id': 'dag_id',
+            'execution_date': 'date',
+            'task_id': 'task_id',
+            'try_number': '3',
+            'airflow_version': mock.ANY,
+            'kubernetes_executor': 'True'
+        }
+        self.metadata = {
+            'labels': self.labels,
+            'name': 'pod_id-' + self.static_uuid.hex,
+            'namespace': 'namespace'
+        }
+
         self.resources = Resources('1Gi', 1, '2Gi', 2, 1)
         self.k8s_client = ApiClient()
-        self.static_uuid = uuid.UUID('cf4a56d2-8101-4217-b027-2af6216feb48')
         self.expected = {
             'apiVersion': 'v1',
             'kind': 'Pod',
@@ -171,9 +187,9 @@ class TestPodGenerator(unittest.TestCase):
                 fs_group=2000,
             ),
             ports=[k8s.V1ContainerPort(name='foo', container_port=1234)],
-            configmaps=['configmap_a', 'configmap_b']
+            configmaps=['configmap_a', 'configmap_b'],
+            extract_xcom=True
         )
-        pod_generator.extract_xcom = True
         result = pod_generator.gen_pod()
         result = append_to_pod(result, self.secrets)
         result = self.resources.attach_to_pod(result)
@@ -253,79 +269,452 @@ class TestPodGenerator(unittest.TestCase):
             }
         }, result)
 
-    def test_reconcile_pods(self):
-        with mock.patch('uuid.uuid4') as mock_uuid:
-            mock_uuid.return_value = self.static_uuid
-            base_pod = PodGenerator(
-                image='image1',
-                name='name1',
-                envs={'key1': 'val1'},
-                cmds=['/bin/command1.sh', 'arg1'],
-                ports=k8s.V1ContainerPort(name='port', container_port=2118),
-                volumes=[{
-                    'hostPath': {'path': '/tmp/'},
-                    'name': 'example-kubernetes-test-volume1'
-                }],
-                volume_mounts=[{
-                    'mountPath': '/foo/',
-                    'name': 'example-kubernetes-test-volume1'
-                }],
-            ).gen_pod()
-
-            mutator_pod = PodGenerator(
-                envs={'key2': 'val2'},
-                image='',
-                name='name2',
-                cmds=['/bin/command2.sh', 'arg2'],
-                volumes=[{
-                    'hostPath': {'path': '/tmp/'},
-                    'name': 'example-kubernetes-test-volume2'
-                }],
-                volume_mounts=[{
-                    'mountPath': '/foo/',
-                    'name': 'example-kubernetes-test-volume2'
-                }]
-            ).gen_pod()
-
-            result = PodGenerator.reconcile_pods(base_pod, mutator_pod)
-            result = self.k8s_client.sanitize_for_serialization(result)
-            self.assertEqual(result, {
-                'apiVersion': 'v1',
-                'kind': 'Pod',
-                'metadata': {'name': 'name2-' + self.static_uuid.hex},
-                'spec': {
-                    'containers': [{
-                        'args': [],
-                        'command': ['/bin/command1.sh', 'arg1'],
-                        'env': [
-                            {'name': 'key1', 'value': 'val1'},
-                            {'name': 'key2', 'value': 'val2'}
-                        ],
-                        'envFrom': [],
-                        'image': 'image1',
-                        'imagePullPolicy': 'IfNotPresent',
-                        'name': 'base',
-                        'ports': {
-                            'containerPort': 2118,
-                            'name': 'port',
-                        },
-                        'volumeMounts': [{
-                            'mountPath': '/foo/',
-                            'name': 'example-kubernetes-test-volume1'
-                        }, {
-                            'mountPath': '/foo/',
-                            'name': 'example-kubernetes-test-volume2'
-                        }]
+    @mock.patch('uuid.uuid4')
+    def test_reconcile_pods_empty_mutator_pod(self, mock_uuid):
+        mock_uuid.return_value = self.static_uuid
+        base_pod = PodGenerator(
+            image='image1',
+            name='name1',
+            envs={'key1': 'val1'},
+            cmds=['/bin/command1.sh', 'arg1'],
+            ports=[k8s.V1ContainerPort(name='port', container_port=2118)],
+            volumes=[{
+                'hostPath': {'path': '/tmp/'},
+                'name': 'example-kubernetes-test-volume1'
+            }],
+            volume_mounts=[{
+                'mountPath': '/foo/',
+                'name': 'example-kubernetes-test-volume1'
+            }],
+        ).gen_pod()
+
+        mutator_pod = None
+        name = 'name1-' + self.static_uuid.hex
+
+        base_pod.metadata.name = name
+
+        result = PodGenerator.reconcile_pods(base_pod, mutator_pod)
+        self.assertEqual(base_pod, result)
+
+        mutator_pod = k8s.V1Pod()
+        result = PodGenerator.reconcile_pods(base_pod, mutator_pod)
+        self.assertEqual(base_pod, result)
+
+    @mock.patch('uuid.uuid4')
+    def test_reconcile_pods(self, mock_uuid):
+        mock_uuid.return_value = self.static_uuid
+        base_pod = PodGenerator(
+            image='image1',
+            name='name1',
+            envs={'key1': 'val1'},
+            cmds=['/bin/command1.sh', 'arg1'],
+            ports=[k8s.V1ContainerPort(name='port', container_port=2118)],
+            volumes=[{
+                'hostPath': {'path': '/tmp/'},
+                'name': 'example-kubernetes-test-volume1'
+            }],
+            volume_mounts=[{
+                'mountPath': '/foo/',
+                'name': 'example-kubernetes-test-volume1'
+            }],
+        ).gen_pod()
+
+        mutator_pod = PodGenerator(
+            envs={'key2': 'val2'},
+            image='',
+            name='name2',
+            cmds=['/bin/command2.sh', 'arg2'],
+            volumes=[{
+                'hostPath': {'path': '/tmp/'},
+                'name': 'example-kubernetes-test-volume2'
+            }],
+            volume_mounts=[{
+                'mountPath': '/foo/',
+                'name': 'example-kubernetes-test-volume2'
+            }]
+        ).gen_pod()
+
+        result = PodGenerator.reconcile_pods(base_pod, mutator_pod)
+        result = self.k8s_client.sanitize_for_serialization(result)
+        self.assertEqual(result, {
+            'apiVersion': 'v1',
+            'kind': 'Pod',
+            'metadata': {'name': 'name2-' + self.static_uuid.hex},
+            'spec': {
+                'containers': [{
+                    'args': [],
+                    'command': ['/bin/command2.sh', 'arg2'],
+                    'env': [
+                        {'name': 'key1', 'value': 'val1'},
+                        {'name': 'key2', 'value': 'val2'}
+                    ],
+                    'envFrom': [],
+                    'image': 'image1',
+                    'imagePullPolicy': 'IfNotPresent',
+                    'name': 'base',
+                    'ports': [{
+                        'containerPort': 2118,
+                        'name': 'port',
                     }],
-                    'hostNetwork': False,
-                    'imagePullSecrets': [],
-                    'restartPolicy': 'Never',
-                    'volumes': [{
-                        'hostPath': {'path': '/tmp/'},
+                    'volumeMounts': [{
+                        'mountPath': '/foo/',
                         'name': 'example-kubernetes-test-volume1'
                     }, {
-                        'hostPath': {'path': '/tmp/'},
+                        'mountPath': '/foo/',
                         'name': 'example-kubernetes-test-volume2'
                     }]
+                }],
+                'hostNetwork': False,
+                'imagePullSecrets': [],
+                'restartPolicy': 'Never',
+                'volumes': [{
+                    'hostPath': {'path': '/tmp/'},
+                    'name': 'example-kubernetes-test-volume1'
+                }, {
+                    'hostPath': {'path': '/tmp/'},
+                    'name': 'example-kubernetes-test-volume2'
+                }]
+            }
+        })
+
+    @mock.patch('uuid.uuid4')
+    def test_construct_pod_empty_worker_config(self, mock_uuid):
+        mock_uuid.return_value = self.static_uuid
+        executor_config = k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name='',
+                        resources=k8s.V1ResourceRequirements(
+                            limits={
+                                'cpu': '1m',
+                                'memory': '1G'
+                            }
+                        )
+                    )
+                ]
+            )
+        )
+        worker_config = k8s.V1Pod()
+
+        result = PodGenerator.construct_pod(
+            'dag_id',
+            'task_id',
+            'pod_id',
+            3,
+            'date',
+            ['command'],
+            executor_config,
+            worker_config,
+            'namespace',
+            'uuid',
+        )
+        sanitized_result = self.k8s_client.sanitize_for_serialization(result)
+
+        self.assertEqual({
+            'apiVersion': 'v1',
+            'kind': 'Pod',
+            'metadata': self.metadata,
+            'spec': {
+                'containers': [{
+                    'args': [],
+                    'command': ['command'],
+                    'env': [],
+                    'envFrom': [],
+                    'imagePullPolicy': 'IfNotPresent',
+                    'name': 'base',
+                    'ports': [],
+                    'resources': {
+                        'limits': {
+                            'cpu': '1m',
+                            'memory': '1G'
+                        }
+                    },
+                    'volumeMounts': []
+                }],
+                'hostNetwork': False,
+                'imagePullSecrets': [],
+                'restartPolicy': 'Never',
+                'volumes': []
+            }
+        }, sanitized_result)
+
+    @mock.patch('uuid.uuid4')
+    def test_construct_pod_empty_execuctor_config(self, mock_uuid):
+        mock_uuid.return_value = self.static_uuid
+        worker_config = k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name='',
+                        resources=k8s.V1ResourceRequirements(
+                            limits={
+                                'cpu': '1m',
+                                'memory': '1G'
+                            }
+                        )
+                    )
+                ]
+            )
+        )
+        executor_config = None
+
+        result = PodGenerator.construct_pod(
+            'dag_id',
+            'task_id',
+            'pod_id',
+            3,
+            'date',
+            ['command'],
+            executor_config,
+            worker_config,
+            'namespace',
+            'uuid',
+        )
+        sanitized_result = self.k8s_client.sanitize_for_serialization(result)
+
+        self.assertEqual({
+            'apiVersion': 'v1',
+            'kind': 'Pod',
+            'metadata': self.metadata,
+            'spec': {
+                'containers': [{
+                    'args': [],
+                    'command': ['command'],
+                    'env': [],
+                    'envFrom': [],
+                    'imagePullPolicy': 'IfNotPresent',
+                    'name': 'base',
+                    'ports': [],
+                    'resources': {
+                        'limits': {
+                            'cpu': '1m',
+                            'memory': '1G'
+                        }
+                    },
+                    'volumeMounts': []
+                }],
+                'hostNetwork': False,
+                'imagePullSecrets': [],
+                'restartPolicy': 'Never',
+                'volumes': []
+            }
+        }, sanitized_result)
+
+    @mock.patch('uuid.uuid4')
+    def test_construct_pod(self, mock_uuid):
+        mock_uuid.return_value = self.static_uuid
+        worker_config = k8s.V1Pod(
+            metadata=k8s.V1ObjectMeta(
+                name='gets-overridden-by-dynamic-args',
+                annotations={
+                    'should': 'stay'
                 }
-            })
+            ),
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name='doesnt-override',
+                        resources=k8s.V1ResourceRequirements(
+                            limits={
+                                'cpu': '1m',
+                                'memory': '1G'
+                            }
+                        ),
+                        security_context=k8s.V1SecurityContext(
+                            run_as_user=1
+                        )
+                    )
+                ]
+            )
+        )
+        executor_config = k8s.V1Pod(
+            spec=k8s.V1PodSpec(
+                containers=[
+                    k8s.V1Container(
+                        name='doesnt-override-either',
+                        resources=k8s.V1ResourceRequirements(
+                            limits={
+                                'cpu': '2m',
+                                'memory': '2G'
+                            }
+                        )
+                    )
+                ]
+            )
+        )
+
+        result = PodGenerator.construct_pod(
+            'dag_id',
+            'task_id',
+            'pod_id',
+            3,
+            'date',
+            ['command'],
+            executor_config,
+            worker_config,
+            'namespace',
+            'uuid',
+        )
+        sanitized_result = self.k8s_client.sanitize_for_serialization(result)
+
+        self.metadata.update({'annotations': {'should': 'stay'}})
+
+        self.assertEqual({
+            'apiVersion': 'v1',
+            'kind': 'Pod',
+            'metadata': self.metadata,
+            'spec': {
+                'containers': [{
+                    'args': [],
+                    'command': ['command'],
+                    'env': [],
+                    'envFrom': [],
+                    'imagePullPolicy': 'IfNotPresent',
+                    'name': 'base',
+                    'ports': [],
+                    'resources': {
+                        'limits': {
+                            'cpu': '2m',
+                            'memory': '2G'
+                        }
+                    },
+                    'volumeMounts': [],
+                    'securityContext': {'runAsUser': 1}
+                }],
+                'hostNetwork': False,
+                'imagePullSecrets': [],
+                'restartPolicy': 'Never',
+                'volumes': []
+            }
+        }, sanitized_result)
+
+    def test_merge_objects_empty(self):
+        annotations = {'foo1': 'bar1'}
+        base_obj = k8s.V1ObjectMeta(annotations=annotations)
+        client_obj = None
+        res = merge_objects(base_obj, client_obj)
+        self.assertEqual(base_obj, res)
+
+        client_obj = k8s.V1ObjectMeta()
+        res = merge_objects(base_obj, client_obj)
+        self.assertEqual(base_obj, res)
+
+        client_obj = k8s.V1ObjectMeta(annotations=annotations)
+        base_obj = None
+        res = merge_objects(base_obj, client_obj)
+        self.assertEqual(client_obj, res)
+
+        base_obj = k8s.V1ObjectMeta()
+        res = merge_objects(base_obj, client_obj)
+        self.assertEqual(client_obj, res)
+
+    def test_merge_objects(self):
+        base_annotations = {'foo1': 'bar1'}
+        base_labels = {'foo1': 'bar1'}
+        client_annotations = {'foo2': 'bar2'}
+        base_obj = k8s.V1ObjectMeta(
+            annotations=base_annotations,
+            labels=base_labels
+        )
+        client_obj = k8s.V1ObjectMeta(annotations=client_annotations)
+        res = merge_objects(base_obj, client_obj)
+        client_obj.labels = base_labels
+        self.assertEqual(client_obj, res)
+
+    def test_extend_object_field_empty(self):
+        ports = [k8s.V1ContainerPort(container_port=1, name='port')]
+        base_obj = k8s.V1Container(name='base_container', ports=ports)
+        client_obj = k8s.V1Container(name='client_container')
+        res = extend_object_field(base_obj, client_obj, 'ports')
+        client_obj.ports = ports
+        self.assertEqual(client_obj, res)
+
+        base_obj = k8s.V1Container(name='base_container')
+        client_obj = k8s.V1Container(name='base_container', ports=ports)
+        res = extend_object_field(base_obj, client_obj, 'ports')
+        self.assertEqual(client_obj, res)
+
+    def test_extend_object_field_not_list(self):
+        base_obj = k8s.V1Container(name='base_container', image='image')
+        client_obj = k8s.V1Container(name='client_container')
+        with self.assertRaises(ValueError):
+            extend_object_field(base_obj, client_obj, 'image')
+        base_obj = k8s.V1Container(name='base_container')
+        client_obj = k8s.V1Container(name='client_container', image='image')
+        with self.assertRaises(ValueError):
+            extend_object_field(base_obj, client_obj, 'image')
+
+    def test_extend_object_field(self):
+        base_ports = [k8s.V1ContainerPort(container_port=1, name='base_port')]
+        base_obj = k8s.V1Container(name='base_container', ports=base_ports)
+        client_ports = [k8s.V1ContainerPort(container_port=1, name='client_port')]
+        client_obj = k8s.V1Container(name='client_container', ports=client_ports)
+        res = extend_object_field(base_obj, client_obj, 'ports')
+        client_obj.ports = base_ports + client_ports
+        self.assertEqual(client_obj, res)
+
+    def test_reconcile_containers_empty(self):
+        base_objs = [k8s.V1Container(name='base_container')]
+        client_objs = []
+        res = PodGenerator.reconcile_containers(base_objs, client_objs)
+        self.assertEqual(base_objs, res)
+
+        client_objs = [k8s.V1Container(name='client_container')]
+        base_objs = []
+        res = PodGenerator.reconcile_containers(base_objs, client_objs)
+        self.assertEqual(client_objs, res)
+
+        res = PodGenerator.reconcile_containers([], [])
+        self.assertEqual(res, [])
+
+    def test_reconcile_containers(self):
+        base_ports = [k8s.V1ContainerPort(container_port=1, name='base_port')]
+        base_objs = [
+            k8s.V1Container(name='base_container1', ports=base_ports),
+            k8s.V1Container(name='base_container2', image='base_image'),
+        ]
+        client_ports = [k8s.V1ContainerPort(container_port=2, name='client_port')]
+        client_objs = [
+            k8s.V1Container(name='client_container1', ports=client_ports),
+            k8s.V1Container(name='client_container2', image='client_image'),
+        ]
+        res = PodGenerator.reconcile_containers(base_objs, client_objs)
+        client_objs[0].ports = base_ports + client_ports
+        self.assertEqual(client_objs, res)
+
+        base_ports = [k8s.V1ContainerPort(container_port=1, name='base_port')]
+        base_objs = [
+            k8s.V1Container(name='base_container1', ports=base_ports),
+            k8s.V1Container(name='base_container2', image='base_image'),
+        ]
+        client_ports = [k8s.V1ContainerPort(container_port=2, name='client_port')]
+        client_objs = [
+            k8s.V1Container(name='client_container1', ports=client_ports),
+            k8s.V1Container(name='client_container2', stdin=True),
+        ]
+        res = PodGenerator.reconcile_containers(base_objs, client_objs)
+        client_objs[0].ports = base_ports + client_ports
+        client_objs[1].image = 'base_image'
+        self.assertEqual(client_objs, res)
+
+    def test_reconcile_specs_empty(self):
+        base_spec = k8s.V1PodSpec(containers=[])
+        client_spec = None
+        res = PodGenerator.reconcile_specs(base_spec, client_spec)
+        self.assertEqual(base_spec, res)
+
+        base_spec = None
+        client_spec = k8s.V1PodSpec(containers=[])
+        res = PodGenerator.reconcile_specs(base_spec, client_spec)
+        self.assertEqual(client_spec, res)
+
+    def test_reconcile_specs(self):
+        base_objs = [k8s.V1Container(name='base_container1', image='base_image')]
+        client_objs = [k8s.V1Container(name='client_container1')]
+        base_spec = k8s.V1PodSpec(priority=1, active_deadline_seconds=100, containers=base_objs)
+        client_spec = k8s.V1PodSpec(priority=2, hostname='local', containers=client_objs)
+        res = PodGenerator.reconcile_specs(base_spec, client_spec)
+        client_spec.containers = [k8s.V1Container(name='client_container1', image='base_image')]
+        client_spec.active_deadline_seconds = 100
+        self.assertEqual(client_spec, res)
diff --git a/tests/kubernetes/test_worker_configuration.py b/tests/kubernetes/test_worker_configuration.py
index 8378f9f..74009a1 100644
--- a/tests/kubernetes/test_worker_configuration.py
+++ b/tests/kubernetes/test_worker_configuration.py
@@ -17,13 +17,12 @@
 #
 
 import unittest
-import uuid
-from datetime import datetime
 
 import six
 
 from tests.compat import mock
 from tests.test_utils.config import conf_vars
+
 try:
     from airflow.executors.kubernetes_executor import AirflowKubernetesScheduler
     from airflow.executors.kubernetes_executor import KubeConfig
@@ -31,6 +30,7 @@ try:
     from airflow.kubernetes.pod_generator import PodGenerator
     from airflow.exceptions import AirflowConfigException
     from airflow.kubernetes.secret import Secret
+    from airflow.version import version as airflow_version
     import kubernetes.client.models as k8s
     from kubernetes.client.api_client import ApiClient
 except ImportError:
@@ -74,6 +74,11 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
         }
     ]
 
+    worker_annotations_config = {
+        'iam.amazonaws.com/role': 'role-arn',
+        'other/annotation': 'value'
+    }
+
     def setUp(self):
         if AirflowKubernetesScheduler is None:
             self.skipTest("kubernetes python package is not installed")
@@ -312,11 +317,39 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
         self.kube_config.git_subpath = 'path'
 
         worker_config = WorkerConfiguration(self.kube_config)
-        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
-                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'")
+        pod = worker_config.as_pod()
 
         self.assertEqual(0, pod.spec.security_context.run_as_user)
 
+    def test_make_pod_assert_labels(self):
+        # Tests the pod created has all the expected labels set
+        self.kube_config.dags_folder = 'dags'
+
+        worker_config = WorkerConfiguration(self.kube_config)
+        pod = PodGenerator.construct_pod(
+            "test_dag_id",
+            "test_task_id",
+            "test_pod_id",
+            1,
+            "2019-11-21 11:08:22.920875",
+            ["bash -c 'ls /'"],
+            None,
+            worker_config.as_pod(),
+            "default",
+            "sample-uuid",
+
+        )
+        expected_labels = {
+            'airflow-worker': 'sample-uuid',
+            'airflow_version': airflow_version.replace('+', '-'),
+            'dag_id': 'test_dag_id',
+            'execution_date': '2019-11-21 11:08:22.920875',
+            'kubernetes_executor': 'True',
+            'task_id': 'test_task_id',
+            'try_number': '1'
+        }
+        self.assertEqual(pod.metadata.labels, expected_labels)
+
     def test_make_pod_git_sync_ssh_without_known_hosts(self):
         # Tests the pod created with git-sync SSH authentication option is correct without known hosts
         self.kube_config.airflow_configmap = 'airflow-configmap'
@@ -331,8 +364,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
 
         worker_config = WorkerConfiguration(self.kube_config)
 
-        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
-                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'")
+        pod = worker_config.as_pod()
 
         init_containers = worker_config._get_init_containers()
         git_ssh_key_file = next((x.value for x in init_containers[0].env
@@ -361,8 +393,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
 
         worker_config = WorkerConfiguration(self.kube_config)
 
-        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
-                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'")
+        pod = worker_config.as_pod()
 
         username_env = k8s.V1EnvVar(
             name='GIT_SYNC_USERNAME',
@@ -387,6 +418,29 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
         self.assertIn(password_env, pod.spec.init_containers[0].env,
                       'The password env for git credentials did not get into the init container')
 
+    def test_make_pod_git_sync_rev(self):
+        # Tests the pod created with git_sync_credentials_secret will get into the init container
+        self.kube_config.git_sync_rev = 'sampletag'
+        self.kube_config.dags_volume_claim = None
+        self.kube_config.dags_volume_host = None
+        self.kube_config.dags_in_image = None
+        self.kube_config.worker_fs_group = None
+        self.kube_config.git_dags_folder_mount_point = 'dags'
+        self.kube_config.git_sync_dest = 'repo'
+        self.kube_config.git_subpath = 'path'
+
+        worker_config = WorkerConfiguration(self.kube_config)
+
+        pod = worker_config.as_pod()
+
+        rev_env = k8s.V1EnvVar(
+            name='GIT_SYNC_REV',
+            value=self.kube_config.git_sync_rev,
+        )
+
+        self.assertIn(rev_env, pod.spec.init_containers[0].env,
+                      'The git_sync_rev env did not get into the init container')
+
     def test_make_pod_git_sync_ssh_with_known_hosts(self):
         # Tests the pod created with git-sync SSH authentication option is correct with known hosts
         self.kube_config.airflow_configmap = 'airflow-configmap'
@@ -415,11 +469,10 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
     def test_make_pod_with_empty_executor_config(self):
         self.kube_config.kube_affinity = self.affinity_config
         self.kube_config.kube_tolerations = self.tolerations_config
+        self.kube_config.kube_annotations = self.worker_annotations_config
         self.kube_config.dags_folder = 'dags'
         worker_config = WorkerConfiguration(self.kube_config)
-
-        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
-                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'")
+        pod = worker_config.as_pod()
 
         self.assertTrue(pod.spec.affinity['podAntiAffinity'] is not None)
         self.assertEqual('app',
@@ -431,6 +484,8 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
 
         self.assertEqual(2, len(pod.spec.tolerations))
         self.assertEqual('prod', pod.spec.tolerations[1]['key'])
+        self.assertEqual('role-arn', pod.metadata.annotations['iam.amazonaws.com/role'])
+        self.assertEqual('value', pod.metadata.annotations['other/annotation'])
 
     def test_make_pod_with_executor_config(self):
         self.kube_config.dags_folder = 'dags'
@@ -441,8 +496,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
             tolerations=self.tolerations_config,
         ).gen_pod()
 
-        pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id",
-                                     "test_task_id", str(datetime.utcnow()), 1, "bash -c 'ls /'")
+        pod = worker_config.as_pod()
 
         result = PodGenerator.reconcile_pods(pod, config_pod)
 
@@ -607,3 +661,18 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
             'dag_id': 'override_dag_id',
             'my_kube_executor_label': 'kubernetes'
         }, labels)
+
+    def test_make_pod_with_image_pull_secrets(self):
+        # Tests the pod created with image_pull_secrets actually gets that in it's config
+        self.kube_config.dags_volume_claim = None
+        self.kube_config.dags_volume_host = None
+        self.kube_config.dags_in_image = None
+        self.kube_config.git_dags_folder_mount_point = 'dags'
+        self.kube_config.git_sync_dest = 'repo'
+        self.kube_config.git_subpath = 'path'
+        self.kube_config.image_pull_secrets = 'image_pull_secret1,image_pull_secret2'
+
+        worker_config = WorkerConfiguration(self.kube_config)
+        pod = worker_config.as_pod()
+
+        self.assertEqual(2, len(pod.spec.image_pull_secrets))


[airflow] 02/03: Allow passing backend_kwargs to AWS SSM client (#8802)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ec3038e952cfbaff932ccb9c7c881093c013b077
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Sun May 10 12:18:39 2020 +0100

    Allow passing backend_kwargs to AWS SSM client (#8802)
    
    (cherry picked from commit cbebed2b4d0bd1e0984c331c0270e83bf8df8540)
---
 airflow/contrib/secrets/aws_systems_manager.py    | 13 ++++++++++---
 tests/contrib/secrets/test_aws_systems_manager.py | 18 ++++++++++++++++++
 tests/secrets/test_secrets.py                     | 14 ++++++++++++++
 3 files changed, 42 insertions(+), 3 deletions(-)

diff --git a/airflow/contrib/secrets/aws_systems_manager.py b/airflow/contrib/secrets/aws_systems_manager.py
index 862fa96..d85c736 100644
--- a/airflow/contrib/secrets/aws_systems_manager.py
+++ b/airflow/contrib/secrets/aws_systems_manager.py
@@ -43,6 +43,13 @@ class SystemsManagerParameterStoreBackend(BaseSecretsBackend, LoggingMixin):
     if you provide ``{"connections_prefix": "/airflow/connections"}`` and request conn_id ``smtp_default``.
     And if ssm path is ``/airflow/variables/hello``, this would be accessible
     if you provide ``{"variables_prefix": "/airflow/variables"}`` and request conn_id ``hello``.
+
+    :param connections_prefix: Specifies the prefix of the secret to read to get Connections.
+    :type connections_prefix: str
+    :param variables_prefix: Specifies the prefix of the secret to read to get Variables.
+    :type variables_prefix: str
+    :param profile_name: The name of a profile to use. If not given, then the default profile is used.
+    :type profile_name: str
     """
 
     def __init__(
@@ -52,10 +59,11 @@ class SystemsManagerParameterStoreBackend(BaseSecretsBackend, LoggingMixin):
         profile_name=None,  # type: Optional[str]
         **kwargs
     ):
+        super(SystemsManagerParameterStoreBackend, self).__init__(**kwargs)
         self.connections_prefix = connections_prefix.rstrip("/")
         self.variables_prefix = variables_prefix.rstrip('/')
         self.profile_name = profile_name
-        super(SystemsManagerParameterStoreBackend, self).__init__(**kwargs)
+        self.kwargs = kwargs
 
     @cached_property
     def client(self):
@@ -63,7 +71,7 @@ class SystemsManagerParameterStoreBackend(BaseSecretsBackend, LoggingMixin):
         Create a SSM client
         """
         session = boto3.Session(profile_name=self.profile_name)
-        return session.client("ssm")
+        return session.client("ssm", **self.kwargs)
 
     def get_conn_uri(self, conn_id):
         # type: (str) -> Optional[str]
@@ -74,7 +82,6 @@ class SystemsManagerParameterStoreBackend(BaseSecretsBackend, LoggingMixin):
         :type conn_id: str
         :rtype: str
         """
-
         return self._get_secret(self.connections_prefix, conn_id)
 
     def get_variable(self, key):
diff --git a/tests/contrib/secrets/test_aws_systems_manager.py b/tests/contrib/secrets/test_aws_systems_manager.py
index 9801f19..b7b48e1 100644
--- a/tests/contrib/secrets/test_aws_systems_manager.py
+++ b/tests/contrib/secrets/test_aws_systems_manager.py
@@ -22,6 +22,8 @@ from moto import mock_ssm
 
 from airflow.contrib.secrets.aws_systems_manager import SystemsManagerParameterStoreBackend
 from tests.compat import mock
+from airflow.secrets import initialize_secrets_backends
+from tests.test_utils.config import conf_vars
 
 
 class TestSystemsManagerParameterStoreBackend(unittest.TestCase):
@@ -108,3 +110,19 @@ class TestSystemsManagerParameterStoreBackend(unittest.TestCase):
         ssm_backend.client.put_parameter(**param)
 
         self.assertIsNone(ssm_backend.get_variable("test_mysql"))
+
+    @conf_vars({
+        ('secrets', 'backend'): 'airflow.contrib.secrets.aws_systems_manager.'
+                                'SystemsManagerParameterStoreBackend',
+        ('secrets', 'backend_kwargs'): '{"use_ssl": false}'
+    })
+    @mock.patch("airflow.contrib.secrets.aws_systems_manager.boto3.Session.client")
+    def test_passing_client_kwargs(self, mock_ssm_client):
+        backends = initialize_secrets_backends()
+        systems_manager = [
+            backend for backend in backends
+            if backend.__class__.__name__ == 'SystemsManagerParameterStoreBackend'
+        ][0]
+
+        systems_manager.client
+        mock_ssm_client.assert_called_once_with('ssm', use_ssl=False)
diff --git a/tests/secrets/test_secrets.py b/tests/secrets/test_secrets.py
index 93de118..1f78c11 100644
--- a/tests/secrets/test_secrets.py
+++ b/tests/secrets/test_secrets.py
@@ -57,6 +57,20 @@ class TestConnectionsFromSecrets(unittest.TestCase):
     @conf_vars({
         ("secrets", "backend"):
             "airflow.contrib.secrets.aws_systems_manager.SystemsManagerParameterStoreBackend",
+        ("secrets", "backend_kwargs"): '{"use_ssl": false}',
+    })
+    def test_backends_kwargs(self):
+        backends = initialize_secrets_backends()
+        systems_manager = [
+            backend for backend in backends
+            if backend.__class__.__name__ == 'SystemsManagerParameterStoreBackend'
+        ][0]
+
+        self.assertEqual(systems_manager.kwargs, {'use_ssl': False})
+
+    @conf_vars({
+        ("secrets", "backend"):
+            "airflow.contrib.secrets.aws_systems_manager.SystemsManagerParameterStoreBackend",
         ("secrets", "backend_kwargs"): '{"connections_prefix": "/airflow", "profile_name": null}',
     })
     @mock.patch.dict('os.environ', {