You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by di...@apache.org on 2020/12/15 05:32:26 UTC

[airflow] 01/01: KubernetesExecutor overrides should only add to lists

This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch fix-env-from-1-10
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 3a5663e43931903cee02abbea5193a6a8cb9e9e1
Author: Daniel Imberman <da...@gmail.com>
AuthorDate: Mon Dec 14 21:22:17 2020 -0800

    KubernetesExecutor overrides should only add to lists
    
    This PR makes 1.10 interaction more similar to that of Airflow 2.0.
    Essentially users are able to override values that are in maps, but when
    it comes to lists in k8s objects, it is too complicated to consistently
    override.
---
 airflow/kubernetes/pod_generator.py    |  225 +++--
 tests/kubernetes/test_pod_generator.py | 1453 ++++++++++++++++----------------
 2 files changed, 848 insertions(+), 830 deletions(-)

diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index 4df3198..8f0d141 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -34,7 +34,6 @@ from dateutil import parser
 from kubernetes.client.api_client import ApiClient
 from airflow.contrib.kubernetes.pod import _extract_volume_mounts
 
-from airflow.exceptions import AirflowConfigException
 from airflow.version import version as airflow_version
 
 MAX_LABEL_LEN = 63
@@ -50,21 +49,15 @@ class PodDefaults(object):
     def __init__(self):
         pass
 
-    XCOM_MOUNT_PATH = '/airflow/xcom'
-    SIDECAR_CONTAINER_NAME = 'airflow-xcom-sidecar'
+    XCOM_MOUNT_PATH = "/airflow/xcom"
+    SIDECAR_CONTAINER_NAME = "airflow-xcom-sidecar"
     XCOM_CMD = 'trap "exit 0" INT; while true; do sleep 30; done;'
-    VOLUME_MOUNT = k8s.V1VolumeMount(
-        name='xcom',
-        mount_path=XCOM_MOUNT_PATH
-    )
-    VOLUME = k8s.V1Volume(
-        name='xcom',
-        empty_dir=k8s.V1EmptyDirVolumeSource()
-    )
+    VOLUME_MOUNT = k8s.V1VolumeMount(name="xcom", mount_path=XCOM_MOUNT_PATH)
+    VOLUME = k8s.V1Volume(name="xcom", empty_dir=k8s.V1EmptyDirVolumeSource())
     SIDECAR_CONTAINER = k8s.V1Container(
         name=SIDECAR_CONTAINER_NAME,
-        command=['sh', '-c', XCOM_CMD],
-        image='alpine',
+        command=["sh", "-c", XCOM_CMD],
+        image="alpine",
         volume_mounts=[VOLUME_MOUNT],
         resources=k8s.V1ResourceRequirements(
             requests={
@@ -88,7 +81,7 @@ def make_safe_label_value(string):
 
     if len(safe_label) > MAX_LABEL_LEN or string != safe_label:
         safe_hash = hashlib.md5(string.encode()).hexdigest()[:9]
-        safe_label = safe_label[:MAX_LABEL_LEN - len(safe_hash) - 1] + "-" + safe_hash
+        safe_label = safe_label[: MAX_LABEL_LEN - len(safe_hash) - 1] + "-" + safe_hash
 
     return safe_label
 
@@ -102,7 +95,7 @@ def datetime_to_label_safe_datestring(datetime_obj):
     :param datetime_obj: datetime.datetime object
     :return: ISO-like string representing the datetime
     """
-    return datetime_obj.isoformat().replace(":", "_").replace('+', '_plus_')
+    return datetime_obj.isoformat().replace(":", "_").replace("+", "_plus_")
 
 
 def label_safe_datestring_to_datetime(string):
@@ -114,7 +107,7 @@ def label_safe_datestring_to_datetime(string):
     :param string: str
     :return: datetime.datetime object
     """
-    return parser.parse(string.replace('_plus_', '+').replace("_", ":"))
+    return parser.parse(string.replace("_plus_", "+").replace("_", ":"))
 
 
 class PodGenerator(object):
@@ -230,8 +223,8 @@ class PodGenerator(object):
             self.ud_pod = pod
 
         self.pod = k8s.V1Pod()
-        self.pod.api_version = 'v1'
-        self.pod.kind = 'Pod'
+        self.pod.api_version = "v1"
+        self.pod.kind = "Pod"
 
         # Pod Metadata
         self.metadata = k8s.V1ObjectMeta()
@@ -241,35 +234,34 @@ class PodGenerator(object):
         self.metadata.annotations = annotations
 
         # Pod Container
-        self.container = k8s.V1Container(name='base')
+        self.container = k8s.V1Container(name="base")
         self.container.image = image
         self.container.env = []
 
         if envs:
             if isinstance(envs, dict):
                 for key, val in envs.items():
-                    self.container.env.append(k8s.V1EnvVar(
-                        name=key,
-                        value=val
-                    ))
+                    self.container.env.append(k8s.V1EnvVar(name=key, value=val))
             elif isinstance(envs, list):
                 self.container.env.extend(envs)
 
         configmaps = configmaps or []
         self.container.env_from = []
         for configmap in configmaps:
-            self.container.env_from.append(k8s.V1EnvFromSource(
-                config_map_ref=k8s.V1ConfigMapEnvSource(
-                    name=configmap
+            self.container.env_from.append(
+                k8s.V1EnvFromSource(
+                    config_map_ref=k8s.V1ConfigMapEnvSource(name=configmap)
                 )
-            ))
+            )
 
         self.container.command = cmds or []
         self.container.args = args or []
         self.container.image_pull_policy = image_pull_policy
         self.container.ports = ports or []
         self.container.resources = resources
-        self.container.volume_mounts = [v.to_k8s_client_obj() for v in _extract_volume_mounts(volume_mounts)]
+        self.container.volume_mounts = [
+            v.to_k8s_client_obj() for v in _extract_volume_mounts(volume_mounts)
+        ]
 
         # Pod Spec
         self.spec = k8s.V1PodSpec(containers=[])
@@ -288,10 +280,10 @@ class PodGenerator(object):
         self.spec.image_pull_secrets = []
 
         if image_pull_secrets:
-            for image_pull_secret in image_pull_secrets.split(','):
-                self.spec.image_pull_secrets.append(k8s.V1LocalObjectReference(
-                    name=image_pull_secret
-                ))
+            for image_pull_secret in image_pull_secrets.split(","):
+                self.spec.image_pull_secrets.append(
+                    k8s.V1LocalObjectReference(name=image_pull_secret)
+                )
 
         # Attach sidecar
         self.extract_xcom = extract_xcom
@@ -325,7 +317,7 @@ class PodGenerator(object):
             return None
 
         safe_uuid = uuid.uuid4().hex
-        safe_pod_id = dag_id[:MAX_POD_ID_LEN - len(safe_uuid) - 1]
+        safe_pod_id = dag_id[: MAX_POD_ID_LEN - len(safe_uuid) - 1]
         safe_pod_id = safe_pod_id + "-" + safe_uuid
 
         return safe_pod_id
@@ -335,7 +327,9 @@ class PodGenerator(object):
         pod_cp = copy.deepcopy(pod)
         pod_cp.spec.volumes = pod.spec.volumes or []
         pod_cp.spec.volumes.insert(0, PodDefaults.VOLUME)
-        pod_cp.spec.containers[0].volume_mounts = pod_cp.spec.containers[0].volume_mounts or []
+        pod_cp.spec.containers[0].volume_mounts = (
+            pod_cp.spec.containers[0].volume_mounts or []
+        )
         pod_cp.spec.containers[0].volume_mounts.insert(0, PodDefaults.VOLUME_MOUNT)
         pod_cp.spec.containers.append(PodDefaults.SIDECAR_CONTAINER)
 
@@ -351,8 +345,9 @@ class PodGenerator(object):
 
         if not isinstance(obj, dict):
             raise TypeError(
-                'Cannot convert a non-dictionary or non-PodGenerator '
-                'object into a KubernetesExecutorConfig')
+                "Cannot convert a non-dictionary or non-PodGenerator "
+                "object into a KubernetesExecutorConfig"
+            )
 
         # We do not want to extract constant here from ExecutorLoader because it is just
         # A name in dictionary rather than executor selection mechanism and it causes cyclic import
@@ -361,50 +356,55 @@ class PodGenerator(object):
         if not namespaced:
             return None
 
-        resources = namespaced.get('resources')
+        resources = namespaced.get("resources")
 
         if resources is None:
+
             def extract(cpu, memory, ephemeral_storage, limit_gpu=None):
                 resources_obj = {
-                    'cpu': namespaced.pop(cpu, None),
-                    'memory': namespaced.pop(memory, None),
-                    'ephemeral-storage': namespaced.pop(ephemeral_storage, None),
+                    "cpu": namespaced.pop(cpu, None),
+                    "memory": namespaced.pop(memory, None),
+                    "ephemeral-storage": namespaced.pop(ephemeral_storage, None),
                 }
                 if limit_gpu is not None:
-                    resources_obj['nvidia.com/gpu'] = namespaced.pop(limit_gpu, None)
+                    resources_obj["nvidia.com/gpu"] = namespaced.pop(limit_gpu, None)
 
-                resources_obj = {k: v for k, v in resources_obj.items() if v is not None}
+                resources_obj = {
+                    k: v for k, v in resources_obj.items() if v is not None
+                }
 
                 if all(r is None for r in resources_obj):
                     resources_obj = None
                 return namespaced, resources_obj
 
-            namespaced, requests = extract('request_cpu', 'request_memory', 'request_ephemeral_storage')
-            namespaced, limits = extract('limit_cpu', 'limit_memory', 'limit_ephemeral_storage',
-                                         limit_gpu='limit_gpu')
+            namespaced, requests = extract(
+                "request_cpu", "request_memory", "request_ephemeral_storage"
+            )
+            namespaced, limits = extract(
+                "limit_cpu",
+                "limit_memory",
+                "limit_ephemeral_storage",
+                limit_gpu="limit_gpu",
+            )
 
             if requests is None and limits is None:
                 resources = None
             else:
-                resources = k8s.V1ResourceRequirements(
-                    requests=requests,
-                    limits=limits
-                )
+                resources = k8s.V1ResourceRequirements(requests=requests, limits=limits)
         elif isinstance(resources, dict):
             resources = k8s.V1ResourceRequirements(
-                requests=resources['requests'],
-                limits=resources['limits']
+                requests=resources["requests"], limits=resources["limits"]
             )
 
-        annotations = namespaced.get('annotations', {})
-        gcp_service_account_key = namespaced.get('gcp_service_account_key', None)
+        annotations = namespaced.get("annotations", {})
+        gcp_service_account_key = namespaced.get("gcp_service_account_key", None)
 
         if annotations is not None and gcp_service_account_key is not None:
-            annotations.update({
-                'iam.cloud.google.com/service-account': gcp_service_account_key
-            })
+            annotations.update(
+                {"iam.cloud.google.com/service-account": gcp_service_account_key}
+            )
 
-        namespaced['resources'] = resources
+        namespaced["resources"] = resources
         return PodGenerator(**namespaced).gen_pod()
 
     @staticmethod
@@ -426,8 +426,12 @@ class PodGenerator(object):
             return base_pod
 
         client_pod_cp = copy.deepcopy(client_pod)
-        client_pod_cp.spec = PodGenerator.reconcile_specs(base_pod.spec, client_pod_cp.spec)
-        client_pod_cp.metadata = PodGenerator.reconcile_metadata(base_pod.metadata, client_pod_cp.metadata)
+        client_pod_cp.spec = PodGenerator.reconcile_specs(
+            base_pod.spec, client_pod_cp.spec
+        )
+        client_pod_cp.metadata = PodGenerator.reconcile_metadata(
+            base_pod.metadata, client_pod_cp.metadata
+        )
         client_pod_cp = merge_objects(base_pod, client_pod_cp)
 
         return client_pod_cp
@@ -448,17 +452,18 @@ class PodGenerator(object):
             return client_meta
         elif client_meta and base_meta:
             client_meta.labels = merge_objects(base_meta.labels, client_meta.labels)
-            client_meta.annotations = merge_objects(base_meta.annotations, client_meta.annotations)
-            extend_object_field(base_meta, client_meta, 'managed_fields')
-            extend_object_field(base_meta, client_meta, 'finalizers')
-            extend_object_field(base_meta, client_meta, 'owner_references')
+            client_meta.annotations = merge_objects(
+                base_meta.annotations, client_meta.annotations
+            )
+            extend_object_field(base_meta, client_meta, "managed_fields")
+            extend_object_field(base_meta, client_meta, "finalizers")
+            extend_object_field(base_meta, client_meta, "owner_references")
             return merge_objects(base_meta, client_meta)
 
         return None
 
     @staticmethod
-    def reconcile_specs(base_spec,
-                        client_spec):
+    def reconcile_specs(base_spec, client_spec):
         """
         :param base_spec: has the base attributes which are overwritten if they exist
             in the client_spec and remain if they do not exist in the client_spec
@@ -475,14 +480,13 @@ class PodGenerator(object):
             client_spec.containers = PodGenerator.reconcile_containers(
                 base_spec.containers, client_spec.containers
             )
-            merged_spec = extend_object_field(base_spec, client_spec, 'volumes')
+            merged_spec = extend_object_field(base_spec, client_spec, "volumes")
             return merge_objects(base_spec, merged_spec)
 
         return None
 
     @staticmethod
-    def reconcile_containers(base_containers,
-                             client_containers):
+    def reconcile_containers(base_containers, client_containers):
         """
         :param base_containers: has the base attributes which are overwritten if they exist
             in the client_containers and remain if they do not exist in the client_containers
@@ -501,14 +505,18 @@ class PodGenerator(object):
         client_container = client_containers[0]
         base_container = base_containers[0]
         client_container = extend_object_field(
-            base_container,
-            client_container,
-            'volume_mounts',
-            'mount_path')
-        client_container = extend_object_field(base_container, client_container, 'env')
-        client_container = extend_object_field(base_container, client_container, 'env_from')
-        client_container = extend_object_field(base_container, client_container, 'ports')
-        client_container = extend_object_field(base_container, client_container, 'volume_devices')
+            base_container, client_container, "volume_mounts"
+        )
+        client_container = extend_object_field(base_container, client_container, "env")
+        client_container = extend_object_field(
+            base_container, client_container, "env_from"
+        )
+        client_container = extend_object_field(
+            base_container, client_container, "ports"
+        )
+        client_container = extend_object_field(
+            base_container, client_container, "volume_devices"
+        )
         client_container = merge_objects(base_container, client_container)
 
         return [client_container] + PodGenerator.reconcile_containers(
@@ -527,7 +535,7 @@ class PodGenerator(object):
         pod_override_object,
         base_worker_pod,
         namespace,
-        worker_uuid
+        worker_uuid,
     ):
         """
         Construct a pod by gathering and consolidating the configuration from 3 places:
@@ -545,22 +553,22 @@ class PodGenerator(object):
             namespace=namespace,
             image=image,
             labels={
-                'airflow-worker': worker_uuid,
-                'dag_id': make_safe_label_value(dag_id),
-                'task_id': make_safe_label_value(task_id),
-                'execution_date': datetime_to_label_safe_datestring(date),
-                'try_number': str(try_number),
-                'airflow_version': airflow_version.replace('+', '-'),
-                'kubernetes_executor': 'True',
+                "airflow-worker": worker_uuid,
+                "dag_id": make_safe_label_value(dag_id),
+                "task_id": make_safe_label_value(task_id),
+                "execution_date": datetime_to_label_safe_datestring(date),
+                "try_number": str(try_number),
+                "airflow_version": airflow_version.replace("+", "-"),
+                "kubernetes_executor": "True",
             },
             annotations={
-                'dag_id': dag_id,
-                'task_id': task_id,
-                'execution_date': date.isoformat(),
-                'try_number': str(try_number),
+                "dag_id": dag_id,
+                "task_id": task_id,
+                "execution_date": date.isoformat(),
+                "try_number": str(try_number),
             },
             cmds=command,
-            name=pod_id
+            name=pod_id,
         ).gen_pod()
 
         # Reconcile the pods starting with the first chronologically,
@@ -627,7 +635,7 @@ def merge_objects(base_obj, client_obj):
     return client_obj_cp
 
 
-def extend_object_field(base_obj, client_obj, field_name, field_to_merge="name"):
+def extend_object_field(base_obj, client_obj, field_name):
     """
     :param base_obj: an object which has a property `field_name` that is a list
     :param client_obj: an object which has a property `field_name` that is a list.
@@ -640,8 +648,9 @@ def extend_object_field(base_obj, client_obj, field_name, field_to_merge="name")
     base_obj_field = getattr(base_obj, field_name, None)
     client_obj_field = getattr(client_obj, field_name, None)
 
-    if (not isinstance(base_obj_field, list) and base_obj_field is not None) or \
-       (not isinstance(client_obj_field, list) and client_obj_field is not None):
+    if (not isinstance(base_obj_field, list) and base_obj_field is not None) or (
+        not isinstance(client_obj_field, list) and client_obj_field is not None
+    ):
         raise ValueError("The chosen field must be a list.")
 
     if not base_obj_field:
@@ -650,36 +659,6 @@ def extend_object_field(base_obj, client_obj, field_name, field_to_merge="name")
         setattr(client_obj_cp, field_name, base_obj_field)
         return client_obj_cp
 
-    base_obj_set = _get_dict_from_list(base_obj_field, field_to_merge)
-    client_obj_set = _get_dict_from_list(client_obj_field, field_to_merge)
-
-    appended_fields = _merge_list_of_objects(base_obj_set, client_obj_set)
-
+    appended_fields = base_obj_field + client_obj_field
     setattr(client_obj_cp, field_name, appended_fields)
     return client_obj_cp
-
-
-def _merge_list_of_objects(base_obj_set, client_obj_set):
-    for k, v in base_obj_set.items():
-        if k not in client_obj_set:
-            client_obj_set[k] = v
-        else:
-            client_obj_set[k] = merge_objects(v, client_obj_set[k])
-    appended_field_keys = sorted(client_obj_set.keys())
-    appended_fields = [client_obj_set[k] for k in appended_field_keys]
-    return appended_fields
-
-
-def _get_dict_from_list(base_list, field_to_merge="name"):
-    """
-    :type base_list: list(Optional[dict, *to_dict])
-    """
-    result = {}
-    for obj in base_list:
-        if isinstance(obj, dict):
-            result[obj[field_to_merge]] = obj
-        elif hasattr(obj, "to_dict"):
-            result[getattr(obj, field_to_merge)] = obj
-        else:
-            raise AirflowConfigException("Trying to merge invalid object {}".format(obj))
-    return result
diff --git a/tests/kubernetes/test_pod_generator.py b/tests/kubernetes/test_pod_generator.py
index fed7c97..df7afdf 100644
--- a/tests/kubernetes/test_pod_generator.py
+++ b/tests/kubernetes/test_pod_generator.py
@@ -25,532 +25,574 @@ from kubernetes.client import ApiClient, models as k8s
 
 from airflow.kubernetes.k8s_model import append_to_pod
 from airflow.kubernetes.pod import Resources
-from airflow.kubernetes.pod_generator import PodDefaults, PodGenerator, extend_object_field, merge_objects, \
-    datetime_to_label_safe_datestring
+from airflow.kubernetes.pod_generator import (
+    PodDefaults,
+    PodGenerator,
+    extend_object_field,
+    merge_objects,
+    datetime_to_label_safe_datestring,
+)
 from airflow.kubernetes.secret import Secret
 
 
 class TestPodGenerator(unittest.TestCase):
-
     def setUp(self):
-        self.static_uuid = uuid.UUID('cf4a56d2-8101-4217-b027-2af6216feb48')
+        self.static_uuid = uuid.UUID("cf4a56d2-8101-4217-b027-2af6216feb48")
         self.deserialize_result = {
-            'apiVersion': 'v1',
-            'kind': 'Pod',
-            'metadata': {'name': 'memory-demo', 'namespace': 'mem-example'},
-            'spec': {
-                'containers': [{
-                    'args': ['--vm', '1', '--vm-bytes', '150M', '--vm-hang', '1'],
-                    'command': ['stress'],
-                    'image': 'apache/airflow:stress-2020.07.10-1.0.4',
-                    'name': 'memory-demo-ctr',
-                    'resources': {
-                        'limits': {'memory': '200Mi'},
-                        'requests': {'memory': '100Mi'}
+            "apiVersion": "v1",
+            "kind": "Pod",
+            "metadata": {"name": "memory-demo", "namespace": "mem-example"},
+            "spec": {
+                "containers": [
+                    {
+                        "args": ["--vm", "1", "--vm-bytes", "150M", "--vm-hang", "1"],
+                        "command": ["stress"],
+                        "image": "apache/airflow:stress-2020.07.10-1.0.4",
+                        "name": "memory-demo-ctr",
+                        "resources": {
+                            "limits": {"memory": "200Mi"},
+                            "requests": {"memory": "100Mi"},
+                        },
                     }
-                }]
-            }
+                ]
+            },
         }
 
-        self.envs = {
-            'ENVIRONMENT': 'prod',
-            'LOG_LEVEL': 'warning'
-        }
+        self.envs = {"ENVIRONMENT": "prod", "LOG_LEVEL": "warning"}
         self.secrets = [
             # This should be a secretRef
-            Secret('env', None, 'secret_a'),
+            Secret("env", None, "secret_a"),
             # This should be a single secret mounted in volumeMounts
-            Secret('volume', '/etc/foo', 'secret_b'),
+            Secret("volume", "/etc/foo", "secret_b"),
             # This should produce a single secret mounted in env
-            Secret('env', 'TARGET', 'secret_b', 'source_b'),
+            Secret("env", "TARGET", "secret_b", "source_b"),
         ]
 
-        self.execution_date = parser.parse('2020-08-24 00:00:00.000000')
-        self.execution_date_label = datetime_to_label_safe_datestring(self.execution_date)
-        self.dag_id = 'dag_id'
-        self.task_id = 'task_id'
+        self.execution_date = parser.parse("2020-08-24 00:00:00.000000")
+        self.execution_date_label = datetime_to_label_safe_datestring(
+            self.execution_date
+        )
+        self.dag_id = "dag_id"
+        self.task_id = "task_id"
         self.try_number = 3
         self.labels = {
-            'airflow-worker': 'uuid',
-            'dag_id': 'dag_id',
-            'execution_date': mock.ANY,
-            'task_id': 'task_id',
-            'try_number': '3',
-            'airflow_version': mock.ANY,
-            'kubernetes_executor': 'True'
+            "airflow-worker": "uuid",
+            "dag_id": "dag_id",
+            "execution_date": mock.ANY,
+            "task_id": "task_id",
+            "try_number": "3",
+            "airflow_version": mock.ANY,
+            "kubernetes_executor": "True",
         }
         self.metadata = {
-            'annotations': {'dag_id': 'dag_id',
-                            'execution_date': '2020-08-24T00:00:00',
-                            'task_id': 'task_id',
-                            'try_number': '3'},
-            'labels': self.labels,
-            'name': 'pod_id-' + self.static_uuid.hex,
-            'namespace': 'namespace'
+            "annotations": {
+                "dag_id": "dag_id",
+                "execution_date": "2020-08-24T00:00:00",
+                "task_id": "task_id",
+                "try_number": "3",
+            },
+            "labels": self.labels,
+            "name": "pod_id-" + self.static_uuid.hex,
+            "namespace": "namespace",
         }
 
-        self.resources = Resources('1Gi', 1, '2Gi', '2Gi', 2, 1, '4Gi')
+        self.resources = Resources("1Gi", 1, "2Gi", "2Gi", 2, 1, "4Gi")
         self.k8s_client = ApiClient()
         self.expected = {
-            'apiVersion': 'v1',
-            'kind': 'Pod',
-            'metadata': {
-                'name': 'myapp-pod-' + self.static_uuid.hex,
-                'labels': {'app': 'myapp'},
-                'namespace': 'default'
+            "apiVersion": "v1",
+            "kind": "Pod",
+            "metadata": {
+                "name": "myapp-pod-" + self.static_uuid.hex,
+                "labels": {"app": "myapp"},
+                "namespace": "default",
             },
-            'spec': {
-                'containers': [{
-                    'name': 'base',
-                    'image': 'busybox',
-                    'args': [],
-                    'command': [
-                        'sh', '-c', 'echo Hello Kubernetes!'
-                    ],
-                    'env': [{
-                        'name': 'ENVIRONMENT',
-                        'value': 'prod'
-                    }, {
-                        'name': 'LOG_LEVEL',
-                        'value': 'warning'
-                    }, {
-                        'name': 'TARGET',
-                        'valueFrom': {
-                            'secretKeyRef': {
-                                'name': 'secret_b',
-                                'key': 'source_b'
-                            }
-                        }
-                    }],
-                    'envFrom': [{
-                        'configMapRef': {
-                            'name': 'configmap_a'
-                        }
-                    }, {
-                        'configMapRef': {
-                            'name': 'configmap_b'
-                        }
-                    }, {
-                        'secretRef': {
-                            'name': 'secret_a'
-                        }
-                    }],
-                    'resources': {
-                        'requests': {
-                            'memory': '1Gi',
-                            'cpu': 1,
-                            'ephemeral-storage': '2Gi'
-                        },
-                        'limits': {
-                            'memory': '2Gi',
-                            'cpu': 2,
-                            'nvidia.com/gpu': 1,
-                            'ephemeral-storage': '4Gi'
+            "spec": {
+                "containers": [
+                    {
+                        "name": "base",
+                        "image": "busybox",
+                        "args": [],
+                        "command": ["sh", "-c", "echo Hello Kubernetes!"],
+                        "env": [
+                            {"name": "ENVIRONMENT", "value": "prod"},
+                            {"name": "LOG_LEVEL", "value": "warning"},
+                            {
+                                "name": "TARGET",
+                                "valueFrom": {
+                                    "secretKeyRef": {
+                                        "name": "secret_b",
+                                        "key": "source_b",
+                                    }
+                                },
+                            },
+                        ],
+                        "envFrom": [
+                            {"configMapRef": {"name": "configmap_a"}},
+                            {"configMapRef": {"name": "configmap_b"}},
+                            {"secretRef": {"name": "secret_a"}},
+                        ],
+                        "resources": {
+                            "requests": {
+                                "memory": "1Gi",
+                                "cpu": 1,
+                                "ephemeral-storage": "2Gi",
+                            },
+                            "limits": {
+                                "memory": "2Gi",
+                                "cpu": 2,
+                                "nvidia.com/gpu": 1,
+                                "ephemeral-storage": "4Gi",
+                            },
                         },
-                    },
-                    'ports': [{'name': 'foo', 'containerPort': 1234}],
-                    'volumeMounts': [{
-                        'mountPath': '/etc/foo',
-                        'name': 'secretvol' + str(self.static_uuid),
-                        'readOnly': True
-                    }]
-                }],
-                'volumes': [{
-                    'name': 'secretvol' + str(self.static_uuid),
-                    'secret': {
-                        'secretName': 'secret_b'
+                        "ports": [{"name": "foo", "containerPort": 1234}],
+                        "volumeMounts": [
+                            {
+                                "mountPath": "/etc/foo",
+                                "name": "secretvol" + str(self.static_uuid),
+                                "readOnly": True,
+                            }
+                        ],
                     }
-                }],
-                'hostNetwork': False,
-                'imagePullSecrets': [
-                    {'name': 'pull_secret_a'},
-                    {'name': 'pull_secret_b'}
                 ],
-                'securityContext': {
-                    'runAsUser': 1000,
-                    'fsGroup': 2000,
+                "volumes": [
+                    {
+                        "name": "secretvol" + str(self.static_uuid),
+                        "secret": {"secretName": "secret_b"},
+                    }
+                ],
+                "hostNetwork": False,
+                "imagePullSecrets": [
+                    {"name": "pull_secret_a"},
+                    {"name": "pull_secret_b"},
+                ],
+                "securityContext": {
+                    "runAsUser": 1000,
+                    "fsGroup": 2000,
                 },
-            }
+            },
         }
 
-    @mock.patch('uuid.uuid4')
+    @mock.patch("uuid.uuid4")
     def test_gen_pod(self, mock_uuid):
         mock_uuid.return_value = self.static_uuid
         pod_generator = PodGenerator(
-            labels={'app': 'myapp'},
-            name='myapp-pod',
-            image_pull_secrets='pull_secret_a,pull_secret_b',
-            image='busybox',
+            labels={"app": "myapp"},
+            name="myapp-pod",
+            image_pull_secrets="pull_secret_a,pull_secret_b",
+            image="busybox",
             envs=self.envs,
-            cmds=['sh', '-c', 'echo Hello Kubernetes!'],
+            cmds=["sh", "-c", "echo Hello Kubernetes!"],
             security_context=k8s.V1PodSecurityContext(
                 run_as_user=1000,
                 fs_group=2000,
             ),
-            namespace='default',
-            ports=[k8s.V1ContainerPort(name='foo', container_port=1234)],
-            configmaps=['configmap_a', 'configmap_b']
+            namespace="default",
+            ports=[k8s.V1ContainerPort(name="foo", container_port=1234)],
+            configmaps=["configmap_a", "configmap_b"],
         )
         result = pod_generator.gen_pod()
         result = append_to_pod(result, self.secrets)
         result = self.resources.attach_to_pod(result)
         result_dict = self.k8s_client.sanitize_for_serialization(result)
         # sort
-        result_dict['spec']['containers'][0]['env'].sort(key=lambda x: x['name'])
-        result_dict['spec']['containers'][0]['envFrom'].sort(
-            key=lambda x: list(x.values())[0]['name']
+        result_dict["spec"]["containers"][0]["env"].sort(key=lambda x: x["name"])
+        result_dict["spec"]["containers"][0]["envFrom"].sort(
+            key=lambda x: list(x.values())[0]["name"]
         )
         self.assertDictEqual(self.expected, result_dict)
 
-    @mock.patch('uuid.uuid4')
+    @mock.patch("uuid.uuid4")
     def test_gen_pod_extract_xcom(self, mock_uuid):
         mock_uuid.return_value = self.static_uuid
         pod_generator = PodGenerator(
-            labels={'app': 'myapp'},
-            name='myapp-pod',
-            image_pull_secrets='pull_secret_a,pull_secret_b',
-            image='busybox',
+            labels={"app": "myapp"},
+            name="myapp-pod",
+            image_pull_secrets="pull_secret_a,pull_secret_b",
+            image="busybox",
             envs=self.envs,
-            cmds=['sh', '-c', 'echo Hello Kubernetes!'],
-            namespace='default',
+            cmds=["sh", "-c", "echo Hello Kubernetes!"],
+            namespace="default",
             security_context=k8s.V1PodSecurityContext(
                 run_as_user=1000,
                 fs_group=2000,
             ),
-            ports=[k8s.V1ContainerPort(name='foo', container_port=1234)],
-            configmaps=['configmap_a', 'configmap_b'],
-            extract_xcom=True
+            ports=[k8s.V1ContainerPort(name="foo", container_port=1234)],
+            configmaps=["configmap_a", "configmap_b"],
+            extract_xcom=True,
         )
         result = pod_generator.gen_pod()
         result = append_to_pod(result, self.secrets)
         result = self.resources.attach_to_pod(result)
         result_dict = self.k8s_client.sanitize_for_serialization(result)
         container_two = {
-            'name': 'airflow-xcom-sidecar',
-            'image': "alpine",
-            'command': ['sh', '-c', PodDefaults.XCOM_CMD],
-            'volumeMounts': [
-                {
-                    'name': 'xcom',
-                    'mountPath': '/airflow/xcom'
-                }
-            ],
-            'resources': {'requests': {'cpu': '1m'}},
+            "name": "airflow-xcom-sidecar",
+            "image": "alpine",
+            "command": ["sh", "-c", PodDefaults.XCOM_CMD],
+            "volumeMounts": [{"name": "xcom", "mountPath": "/airflow/xcom"}],
+            "resources": {"requests": {"cpu": "1m"}},
         }
-        self.expected['spec']['containers'].append(container_two)
-        self.expected['spec']['containers'][0]['volumeMounts'].insert(0, {
-            'name': 'xcom',
-            'mountPath': '/airflow/xcom'
-        })
-        self.expected['spec']['volumes'].insert(0, {
-            'name': 'xcom', 'emptyDir': {}
-        })
-        result_dict['spec']['containers'][0]['env'].sort(key=lambda x: x['name'])
+        self.expected["spec"]["containers"].append(container_two)
+        self.expected["spec"]["containers"][0]["volumeMounts"].insert(
+            0, {"name": "xcom", "mountPath": "/airflow/xcom"}
+        )
+        self.expected["spec"]["volumes"].insert(0, {"name": "xcom", "emptyDir": {}})
+        result_dict["spec"]["containers"][0]["env"].sort(key=lambda x: x["name"])
         self.assertEqual(result_dict, self.expected)
 
-    @mock.patch('uuid.uuid4')
+    @mock.patch("uuid.uuid4")
     def test_from_obj(self, mock_uuid):
         mock_uuid.return_value = self.static_uuid
-        result = PodGenerator.from_obj({
-            "KubernetesExecutor": {
-                "annotations": {"test": "annotation"},
-                "volumes": [
-                    {
-                        "name": "example-kubernetes-test-volume",
-                        "hostPath": {"path": "/tmp/"},
-                    },
-                ],
-                "volume_mounts": [
-                    {
-                        "mountPath": "/foo/",
-                        "name": "example-kubernetes-test-volume",
-                    },
-                ],
-                "resources": {
-                    "requests": {
-                        "memory": "256Mi",
-                        "cpu": "500m",
-                        "ephemeral-storage": "2G",
-                        "nvidia.com/gpu": "0"
+        result = PodGenerator.from_obj(
+            {
+                "KubernetesExecutor": {
+                    "annotations": {"test": "annotation"},
+                    "volumes": [
+                        {
+                            "name": "example-kubernetes-test-volume",
+                            "hostPath": {"path": "/tmp/"},
+                        },
+                    ],
+                    "volume_mounts": [
+                        {
+                            "mountPath": "/foo/",
+                            "name": "example-kubernetes-test-volume",
+                        },
+                    ],
+                    "resources": {
+                        "requests": {
+                            "memory": "256Mi",
+                            "cpu": "500m",
+                            "ephemeral-storage": "2G",
+                            "nvidia.com/gpu": "0",
+                        },
+                        "limits": {
+                            "memory": "512Mi",
+                            "cpu": "1000m",
+                            "ephemeral-storage": "2G",
+                            "nvidia.com/gpu": "0",
+                        },
                     },
-                    "limits": {
-                        "memory": "512Mi",
-                        "cpu": "1000m",
-                        "ephemeral-storage": "2G",
-                        "nvidia.com/gpu": "0"
-                    }
                 }
             }
-        })
+        )
         result = self.k8s_client.sanitize_for_serialization(result)
 
-        self.assertEqual({
-            'apiVersion': 'v1',
-            'kind': 'Pod',
-            'metadata': {
-                'annotations': {'test': 'annotation'},
+        self.assertEqual(
+            {
+                "apiVersion": "v1",
+                "kind": "Pod",
+                "metadata": {
+                    "annotations": {"test": "annotation"},
+                },
+                "spec": {
+                    "containers": [
+                        {
+                            "args": [],
+                            "command": [],
+                            "env": [],
+                            "envFrom": [],
+                            "name": "base",
+                            "ports": [],
+                            "volumeMounts": [
+                                {
+                                    "mountPath": "/foo/",
+                                    "name": "example-kubernetes-test-volume",
+                                }
+                            ],
+                            "resources": {
+                                "requests": {
+                                    "memory": "256Mi",
+                                    "cpu": "500m",
+                                    "ephemeral-storage": "2G",
+                                    "nvidia.com/gpu": "0",
+                                },
+                                "limits": {
+                                    "memory": "512Mi",
+                                    "cpu": "1000m",
+                                    "ephemeral-storage": "2G",
+                                    "nvidia.com/gpu": "0",
+                                },
+                            },
+                        }
+                    ],
+                    "hostNetwork": False,
+                    "imagePullSecrets": [],
+                    "volumes": [
+                        {
+                            "hostPath": {"path": "/tmp/"},
+                            "name": "example-kubernetes-test-volume",
+                        }
+                    ],
+                },
             },
-            'spec': {
-                'containers': [{
-                    'args': [],
-                    'command': [],
-                    'env': [],
-                    'envFrom': [],
-                    'name': 'base',
-                    'ports': [],
-                    'volumeMounts': [{
-                        'mountPath': '/foo/',
-                        'name': 'example-kubernetes-test-volume'
-                    }],
+            result,
+        )
+
+    @mock.patch("uuid.uuid4")
+    def test_from_obj_with_resources_object(self, mock_uuid):
+        mock_uuid.return_value = self.static_uuid
+        result = PodGenerator.from_obj(
+            {
+                "KubernetesExecutor": {
+                    "annotations": {"test": "annotation"},
+                    "volumes": [
+                        {
+                            "name": "example-kubernetes-test-volume",
+                            "hostPath": {"path": "/tmp/"},
+                        },
+                    ],
+                    "volume_mounts": [
+                        {
+                            "mountPath": "/foo/",
+                            "name": "example-kubernetes-test-volume",
+                        },
+                    ],
                     "resources": {
                         "requests": {
                             "memory": "256Mi",
                             "cpu": "500m",
                             "ephemeral-storage": "2G",
-                            "nvidia.com/gpu": "0"
+                            "nvidia.com/gpu": "0",
                         },
                         "limits": {
                             "memory": "512Mi",
                             "cpu": "1000m",
                             "ephemeral-storage": "2G",
-                            "nvidia.com/gpu": "0"
-                        }
-                    }
-                }],
-                'hostNetwork': False,
-                'imagePullSecrets': [],
-                'volumes': [{
-                    'hostPath': {'path': '/tmp/'},
-                    'name': 'example-kubernetes-test-volume'
-                }],
-            }
-        }, result)
-
-    @mock.patch('uuid.uuid4')
-    def test_from_obj_with_resources_object(self, mock_uuid):
-        mock_uuid.return_value = self.static_uuid
-        result = PodGenerator.from_obj({
-            "KubernetesExecutor": {
-                "annotations": {"test": "annotation"},
-                "volumes": [
-                    {
-                        "name": "example-kubernetes-test-volume",
-                        "hostPath": {"path": "/tmp/"},
-                    },
-                ],
-                "volume_mounts": [
-                    {
-                        "mountPath": "/foo/",
-                        "name": "example-kubernetes-test-volume",
-                    },
-                ],
-                "resources": {
-                    "requests": {
-                        "memory": "256Mi",
-                        "cpu": "500m",
-                        "ephemeral-storage": "2G",
-                        "nvidia.com/gpu": "0"
+                            "nvidia.com/gpu": "0",
+                        },
                     },
-                    "limits": {
-                        "memory": "512Mi",
-                        "cpu": "1000m",
-                        "ephemeral-storage": "2G",
-                        "nvidia.com/gpu": "0"
-                    }
                 }
             }
-        })
+        )
         result = self.k8s_client.sanitize_for_serialization(result)
 
-        self.assertEqual({
-            'apiVersion': 'v1',
-            'kind': 'Pod',
-            'metadata': {
-                'annotations': {'test': 'annotation'},
+        self.assertEqual(
+            {
+                "apiVersion": "v1",
+                "kind": "Pod",
+                "metadata": {
+                    "annotations": {"test": "annotation"},
+                },
+                "spec": {
+                    "containers": [
+                        {
+                            "args": [],
+                            "command": [],
+                            "env": [],
+                            "envFrom": [],
+                            "name": "base",
+                            "ports": [],
+                            "volumeMounts": [
+                                {
+                                    "mountPath": "/foo/",
+                                    "name": "example-kubernetes-test-volume",
+                                }
+                            ],
+                            "resources": {
+                                "limits": {
+                                    "cpu": "1000m",
+                                    "ephemeral-storage": "2G",
+                                    "memory": "512Mi",
+                                    "nvidia.com/gpu": "0",
+                                },
+                                "requests": {
+                                    "cpu": "500m",
+                                    "ephemeral-storage": "2G",
+                                    "memory": "256Mi",
+                                    "nvidia.com/gpu": "0",
+                                },
+                            },
+                        }
+                    ],
+                    "hostNetwork": False,
+                    "imagePullSecrets": [],
+                    "volumes": [
+                        {
+                            "hostPath": {"path": "/tmp/"},
+                            "name": "example-kubernetes-test-volume",
+                        }
+                    ],
+                },
             },
-            'spec': {
-                'containers': [{
-                    'args': [],
-                    'command': [],
-                    'env': [],
-                    'envFrom': [],
-                    'name': 'base',
-                    'ports': [],
-                    'volumeMounts': [{
-                        'mountPath': '/foo/',
-                        'name': 'example-kubernetes-test-volume'
-                    }],
-                    'resources': {'limits': {'cpu': '1000m',
-                                             'ephemeral-storage': '2G',
-                                             'memory': '512Mi',
-                                             'nvidia.com/gpu': '0'},
-                                  'requests': {'cpu': '500m',
-                                               'ephemeral-storage': '2G',
-                                               'memory': '256Mi',
-                                               'nvidia.com/gpu': '0'}},
-                }],
-                'hostNetwork': False,
-                'imagePullSecrets': [],
-                'volumes': [{
-                    'hostPath': {'path': '/tmp/'},
-                    'name': 'example-kubernetes-test-volume'
-                }],
-            }
-        }, result)
+            result,
+        )
 
-    @mock.patch('uuid.uuid4')
+    @mock.patch("uuid.uuid4")
     def test_from_obj_with_resources(self, mock_uuid):
         self.maxDiff = None
 
         mock_uuid.return_value = self.static_uuid
-        result = PodGenerator.from_obj({
-            "KubernetesExecutor": {
-                "annotations": {"test": "annotation"},
-                "volumes": [
-                    {
-                        "name": "example-kubernetes-test-volume",
-                        "hostPath": {"path": "/tmp/"},
-                    },
-                ],
-                "volume_mounts": [
-                    {
-                        "mountPath": "/foo/",
-                        "name": "example-kubernetes-test-volume",
-                    },
-                ],
-                'request_cpu': "200m",
-                'limit_cpu': "400m",
-                'request_memory': "500Mi",
-                'limit_memory': "1000Mi",
-                'limit_gpu': "2",
-                'request_ephemeral_storage': '2Gi',
-                'limit_ephemeral_storage': '4Gi',
+        result = PodGenerator.from_obj(
+            {
+                "KubernetesExecutor": {
+                    "annotations": {"test": "annotation"},
+                    "volumes": [
+                        {
+                            "name": "example-kubernetes-test-volume",
+                            "hostPath": {"path": "/tmp/"},
+                        },
+                    ],
+                    "volume_mounts": [
+                        {
+                            "mountPath": "/foo/",
+                            "name": "example-kubernetes-test-volume",
+                        },
+                    ],
+                    "request_cpu": "200m",
+                    "limit_cpu": "400m",
+                    "request_memory": "500Mi",
+                    "limit_memory": "1000Mi",
+                    "limit_gpu": "2",
+                    "request_ephemeral_storage": "2Gi",
+                    "limit_ephemeral_storage": "4Gi",
+                }
             }
-        })
+        )
         result = self.k8s_client.sanitize_for_serialization(result)
 
-        self.assertEqual({
-            'apiVersion': 'v1',
-            'kind': 'Pod',
-            'metadata': {
-                'annotations': {'test': 'annotation'},
+        self.assertEqual(
+            {
+                "apiVersion": "v1",
+                "kind": "Pod",
+                "metadata": {
+                    "annotations": {"test": "annotation"},
+                },
+                "spec": {
+                    "containers": [
+                        {
+                            "args": [],
+                            "command": [],
+                            "env": [],
+                            "envFrom": [],
+                            "name": "base",
+                            "ports": [],
+                            "resources": {
+                                "limits": {
+                                    "cpu": "400m",
+                                    "ephemeral-storage": "4Gi",
+                                    "memory": "1000Mi",
+                                    "nvidia.com/gpu": "2",
+                                },
+                                "requests": {
+                                    "cpu": "200m",
+                                    "ephemeral-storage": "2Gi",
+                                    "memory": "500Mi",
+                                },
+                            },
+                            "volumeMounts": [
+                                {
+                                    "mountPath": "/foo/",
+                                    "name": "example-kubernetes-test-volume",
+                                }
+                            ],
+                        }
+                    ],
+                    "hostNetwork": False,
+                    "imagePullSecrets": [],
+                    "volumes": [
+                        {
+                            "hostPath": {"path": "/tmp/"},
+                            "name": "example-kubernetes-test-volume",
+                        }
+                    ],
+                },
             },
-            'spec': {
-                'containers': [{
-                    'args': [],
-                    'command': [],
-                    'env': [],
-                    'envFrom': [],
-                    'name': 'base',
-                    'ports': [],
-                    'resources': {
-                        'limits': {
-                            'cpu': '400m',
-                            'ephemeral-storage': '4Gi',
-                            'memory': '1000Mi',
-                            'nvidia.com/gpu': "2",
-                        },
-                        'requests': {
-                            'cpu': '200m',
-                            'ephemeral-storage': '2Gi',
-                            'memory': '500Mi',
-                        },
-                    },
-                    'volumeMounts': [{
-                        'mountPath': '/foo/',
-                        'name': 'example-kubernetes-test-volume'
-                    }],
-                }],
-                'hostNetwork': False,
-                'imagePullSecrets': [],
-                'volumes': [{
-                    'hostPath': {'path': '/tmp/'},
-                    'name': 'example-kubernetes-test-volume'
-                }],
-            }
-        }, result)
+            result,
+        )
 
-    @mock.patch('uuid.uuid4')
+    @mock.patch("uuid.uuid4")
     def test_from_obj_with_only_request_resources(self, mock_uuid):
         self.maxDiff = None
 
         mock_uuid.return_value = self.static_uuid
-        result = PodGenerator.from_obj({
-            "KubernetesExecutor": {
-                "annotations": {"test": "annotation"},
-                "volumes": [
-                    {
-                        "name": "example-kubernetes-test-volume",
-                        "hostPath": {"path": "/tmp/"},
-                    },
-                ],
-                "volume_mounts": [
-                    {
-                        "mountPath": "/foo/",
-                        "name": "example-kubernetes-test-volume",
-                    },
-                ],
-                'request_cpu': "200m",
-                'request_memory': "500Mi",
+        result = PodGenerator.from_obj(
+            {
+                "KubernetesExecutor": {
+                    "annotations": {"test": "annotation"},
+                    "volumes": [
+                        {
+                            "name": "example-kubernetes-test-volume",
+                            "hostPath": {"path": "/tmp/"},
+                        },
+                    ],
+                    "volume_mounts": [
+                        {
+                            "mountPath": "/foo/",
+                            "name": "example-kubernetes-test-volume",
+                        },
+                    ],
+                    "request_cpu": "200m",
+                    "request_memory": "500Mi",
+                }
             }
-        })
+        )
         result = self.k8s_client.sanitize_for_serialization(result)
 
-        self.assertEqual({
-            'apiVersion': 'v1',
-            'kind': 'Pod',
-            'metadata': {
-                'annotations': {'test': 'annotation'},
+        self.assertEqual(
+            {
+                "apiVersion": "v1",
+                "kind": "Pod",
+                "metadata": {
+                    "annotations": {"test": "annotation"},
+                },
+                "spec": {
+                    "containers": [
+                        {
+                            "args": [],
+                            "command": [],
+                            "env": [],
+                            "envFrom": [],
+                            "name": "base",
+                            "ports": [],
+                            "resources": {
+                                "requests": {
+                                    "cpu": "200m",
+                                    "memory": "500Mi",
+                                },
+                            },
+                            "volumeMounts": [
+                                {
+                                    "mountPath": "/foo/",
+                                    "name": "example-kubernetes-test-volume",
+                                }
+                            ],
+                        }
+                    ],
+                    "hostNetwork": False,
+                    "imagePullSecrets": [],
+                    "volumes": [
+                        {
+                            "hostPath": {"path": "/tmp/"},
+                            "name": "example-kubernetes-test-volume",
+                        }
+                    ],
+                },
             },
-            'spec': {
-                'containers': [{
-                    'args': [],
-                    'command': [],
-                    'env': [],
-                    'envFrom': [],
-                    'name': 'base',
-                    'ports': [],
-                    'resources': {
-                        'requests': {
-                            'cpu': '200m',
-                            'memory': '500Mi',
-                        },
-                    },
-                    'volumeMounts': [{
-                        'mountPath': '/foo/',
-                        'name': 'example-kubernetes-test-volume'
-                    }],
-                }],
-                'hostNetwork': False,
-                'imagePullSecrets': [],
-                'volumes': [{
-                    'hostPath': {'path': '/tmp/'},
-                    'name': 'example-kubernetes-test-volume'
-                }],
-            }
-        }, result)
+            result,
+        )
 
-    @mock.patch('uuid.uuid4')
+    @mock.patch("uuid.uuid4")
     def test_reconcile_pods_empty_mutator_pod(self, mock_uuid):
         mock_uuid.return_value = self.static_uuid
         base_pod = PodGenerator(
-            image='image1',
-            name='name1',
-            envs={'key1': 'val1'},
-            cmds=['/bin/command1.sh', 'arg1'],
-            ports=[k8s.V1ContainerPort(name='port', container_port=2118)],
-            volumes=[{
-                'hostPath': {'path': '/tmp/'},
-                'name': 'example-kubernetes-test-volume1'
-            }],
-            volume_mounts=[{
-                'mountPath': '/foo/',
-                'name': 'example-kubernetes-test-volume1'
-            }],
+            image="image1",
+            name="name1",
+            envs={"key1": "val1"},
+            cmds=["/bin/command1.sh", "arg1"],
+            ports=[k8s.V1ContainerPort(name="port", container_port=2118)],
+            volumes=[
+                {
+                    "hostPath": {"path": "/tmp/"},
+                    "name": "example-kubernetes-test-volume1",
+                }
+            ],
+            volume_mounts=[
+                {"mountPath": "/foo/", "name": "example-kubernetes-test-volume1"}
+            ],
         ).gen_pod()
 
         mutator_pod = None
-        name = 'name1-' + self.static_uuid.hex
+        name = "name1-" + self.static_uuid.hex
 
         base_pod.metadata.name = name
 
@@ -561,92 +603,107 @@ class TestPodGenerator(unittest.TestCase):
         result = PodGenerator.reconcile_pods(base_pod, mutator_pod)
         self.assertEqual(base_pod, result)
 
-    @mock.patch('uuid.uuid4')
+    @mock.patch("uuid.uuid4")
     def test_reconcile_pods(self, mock_uuid):
         mock_uuid.return_value = self.static_uuid
         base_pod = PodGenerator(
-            image='image1',
-            name='name1',
-            envs={'key1': 'val1'},
-            cmds=['/bin/command1.sh', 'arg1'],
-            ports=[k8s.V1ContainerPort(name='port', container_port=2118)],
-            volumes=[{
-                'hostPath': {'path': '/tmp/'},
-                'name': 'example-kubernetes-test-volume1'
-            }],
-            volume_mounts=[{
-                'mountPath': '/foo/',
-                'name': 'example-kubernetes-test-volume1'
-            }],
+            image="image1",
+            name="name1",
+            envs={"key1": "val1"},
+            cmds=["/bin/command1.sh", "arg1"],
+            ports=[k8s.V1ContainerPort(name="port", container_port=2118)],
+            volumes=[
+                {
+                    "hostPath": {"path": "/tmp/"},
+                    "name": "example-kubernetes-test-volume1",
+                }
+            ],
+            volume_mounts=[
+                {"mountPath": "/foo/", "name": "example-kubernetes-test-volume1"}
+            ],
         ).gen_pod()
 
         mutator_pod = PodGenerator(
-            envs={'key2': 'val2'},
-            image='',
-            name='name2',
-            cmds=['/bin/command2.sh', 'arg2'],
-            volumes=[{
-                'hostPath': {'path': '/tmp/'},
-                'name': 'example-kubernetes-test-volume2'
-            }],
-            volume_mounts=[{
-                'mountPath': '/foo/',
-                'name': 'example-kubernetes-test-volume2'
-            }]
+            envs={"key2": "val2"},
+            image="",
+            name="name2",
+            cmds=["/bin/command2.sh", "arg2"],
+            volumes=[
+                {
+                    "hostPath": {"path": "/tmp/"},
+                    "name": "example-kubernetes-test-volume2",
+                }
+            ],
+            volume_mounts=[
+                {"mountPath": "/foo/", "name": "example-kubernetes-test-volume2"}
+            ],
         ).gen_pod()
 
         result = PodGenerator.reconcile_pods(base_pod, mutator_pod)
         result = self.k8s_client.sanitize_for_serialization(result)
-        self.assertEqual({
-            'apiVersion': 'v1',
-            'kind': 'Pod',
-            'metadata': {'name': 'name2-' + self.static_uuid.hex},
-            'spec': {
-                'containers': [{
-                    'args': [],
-                    'command': ['/bin/command2.sh', 'arg2'],
-                    'env': [
-                        {'name': 'key1', 'value': 'val1'},
-                        {'name': 'key2', 'value': 'val2'}
+        self.assertEqual(
+            {
+                "apiVersion": "v1",
+                "kind": "Pod",
+                "metadata": {"name": "name2-" + self.static_uuid.hex},
+                "spec": {
+                    "containers": [
+                        {
+                            "args": [],
+                            "command": ["/bin/command2.sh", "arg2"],
+                            "env": [
+                                {"name": "key1", "value": "val1"},
+                                {"name": "key2", "value": "val2"},
+                            ],
+                            "envFrom": [],
+                            "image": "image1",
+                            "name": "base",
+                            "ports": [
+                                {
+                                    "containerPort": 2118,
+                                    "name": "port",
+                                }
+                            ],
+                            "volumeMounts": [
+                                {
+                                    "mountPath": "/foo/",
+                                    "name": "example-kubernetes-test-volume1",
+                                },
+                                {
+                                    "mountPath": "/foo/",
+                                    "name": "example-kubernetes-test-volume2",
+                                },
+                            ],
+                        }
                     ],
-                    'envFrom': [],
-                    'image': 'image1',
-                    'name': 'base',
-                    'ports': [{
-                        'containerPort': 2118,
-                        'name': 'port',
-                    }],
-                    'volumeMounts': [{
-                        'mountPath': '/foo/',
-                        'name': 'example-kubernetes-test-volume2'
-                    }]
-                }],
-                'hostNetwork': False,
-                'imagePullSecrets': [],
-                'volumes': [{
-                    'hostPath': {'path': '/tmp/'},
-                    'name': 'example-kubernetes-test-volume1'
-                }, {
-                    'hostPath': {'path': '/tmp/'},
-                    'name': 'example-kubernetes-test-volume2'
-                }]
-            }
-        }, result)
+                    "hostNetwork": False,
+                    "imagePullSecrets": [],
+                    "volumes": [
+                        {
+                            "hostPath": {"path": "/tmp/"},
+                            "name": "example-kubernetes-test-volume1",
+                        },
+                        {
+                            "hostPath": {"path": "/tmp/"},
+                            "name": "example-kubernetes-test-volume2",
+                        },
+                    ],
+                },
+            },
+            result,
+        )
 
-    @mock.patch('uuid.uuid4')
+    @mock.patch("uuid.uuid4")
     def test_construct_pod_empty_worker_config(self, mock_uuid):
         mock_uuid.return_value = self.static_uuid
         executor_config = k8s.V1Pod(
             spec=k8s.V1PodSpec(
                 containers=[
                     k8s.V1Container(
-                        name='',
+                        name="",
                         resources=k8s.V1ResourceRequirements(
-                            limits={
-                                'cpu': '1m',
-                                'memory': '1G'
-                            }
-                        )
+                            limits={"cpu": "1m", "memory": "1G"}
+                        ),
                     )
                 ]
             )
@@ -654,61 +711,58 @@ class TestPodGenerator(unittest.TestCase):
         worker_config = k8s.V1Pod()
 
         result = PodGenerator.construct_pod(
-            'dag_id',
-            'task_id',
-            'pod_id',
+            "dag_id",
+            "task_id",
+            "pod_id",
             self.try_number,
             "kube_image",
             self.execution_date,
-            ['command'],
+            ["command"],
             executor_config,
             worker_config,
-            'namespace',
-            'uuid',
+            "namespace",
+            "uuid",
         )
         sanitized_result = self.k8s_client.sanitize_for_serialization(result)
 
-        self.assertEqual({
-            'apiVersion': 'v1',
-            'kind': 'Pod',
-            'metadata': self.metadata,
-            'spec': {
-                'containers': [{
-                    'args': [],
-                    'command': ['command'],
-                    'env': [],
-                    'envFrom': [],
-                    'name': 'base',
-                    'image': 'kube_image',
-                    'ports': [],
-                    'resources': {
-                        'limits': {
-                            'cpu': '1m',
-                            'memory': '1G'
+        self.assertEqual(
+            {
+                "apiVersion": "v1",
+                "kind": "Pod",
+                "metadata": self.metadata,
+                "spec": {
+                    "containers": [
+                        {
+                            "args": [],
+                            "command": ["command"],
+                            "env": [],
+                            "envFrom": [],
+                            "name": "base",
+                            "image": "kube_image",
+                            "ports": [],
+                            "resources": {"limits": {"cpu": "1m", "memory": "1G"}},
+                            "volumeMounts": [],
                         }
-                    },
-                    'volumeMounts': []
-                }],
-                'hostNetwork': False,
-                'imagePullSecrets': [],
-                'volumes': []
-            }
-        }, sanitized_result)
+                    ],
+                    "hostNetwork": False,
+                    "imagePullSecrets": [],
+                    "volumes": [],
+                },
+            },
+            sanitized_result,
+        )
 
-    @mock.patch('uuid.uuid4')
+    @mock.patch("uuid.uuid4")
     def test_construct_pod_empty_executor_config(self, mock_uuid):
         mock_uuid.return_value = self.static_uuid
         worker_config = k8s.V1Pod(
             spec=k8s.V1PodSpec(
                 containers=[
                     k8s.V1Container(
-                        name='',
+                        name="",
                         resources=k8s.V1ResourceRequirements(
-                            limits={
-                                'cpu': '1m',
-                                'memory': '1G'
-                            }
-                        )
+                            limits={"cpu": "1m", "memory": "1G"}
+                        ),
                     )
                 ]
             )
@@ -716,225 +770,203 @@ class TestPodGenerator(unittest.TestCase):
         executor_config = None
 
         result = PodGenerator.construct_pod(
-            'dag_id',
-            'task_id',
-            'pod_id',
+            "dag_id",
+            "task_id",
+            "pod_id",
             self.try_number,
             "kube_image",
             self.execution_date,
-            ['command'],
+            ["command"],
             executor_config,
             worker_config,
-            'namespace',
-            'uuid',
+            "namespace",
+            "uuid",
         )
         sanitized_result = self.k8s_client.sanitize_for_serialization(result)
 
-        self.assertEqual({
-            'apiVersion': 'v1',
-            'kind': 'Pod',
-            'metadata': self.metadata,
-            'spec': {
-                'containers': [{
-                    'args': [],
-                    'command': ['command'],
-                    'env': [],
-                    'envFrom': [],
-                    'name': 'base',
-                    'image': 'kube_image',
-                    'ports': [],
-                    'resources': {
-                        'limits': {
-                            'cpu': '1m',
-                            'memory': '1G'
+        self.assertEqual(
+            {
+                "apiVersion": "v1",
+                "kind": "Pod",
+                "metadata": self.metadata,
+                "spec": {
+                    "containers": [
+                        {
+                            "args": [],
+                            "command": ["command"],
+                            "env": [],
+                            "envFrom": [],
+                            "name": "base",
+                            "image": "kube_image",
+                            "ports": [],
+                            "resources": {"limits": {"cpu": "1m", "memory": "1G"}},
+                            "volumeMounts": [],
                         }
-                    },
-                    'volumeMounts': []
-                }],
-                'hostNetwork': False,
-                'imagePullSecrets': [],
-                'volumes': []
-            }
-        }, sanitized_result)
+                    ],
+                    "hostNetwork": False,
+                    "imagePullSecrets": [],
+                    "volumes": [],
+                },
+            },
+            sanitized_result,
+        )
 
-    @mock.patch('uuid.uuid4')
+    @mock.patch("uuid.uuid4")
     def test_construct_pod(self, mock_uuid):
         mock_uuid.return_value = self.static_uuid
         worker_config = k8s.V1Pod(
             metadata=k8s.V1ObjectMeta(
-                name='gets-overridden-by-dynamic-args',
-                annotations={
-                    'should': 'stay'
-                }
+                name="gets-overridden-by-dynamic-args", annotations={"should": "stay"}
             ),
             spec=k8s.V1PodSpec(
                 containers=[
                     k8s.V1Container(
-                        name='doesnt-override',
+                        name="doesnt-override",
                         resources=k8s.V1ResourceRequirements(
-                            limits={
-                                'cpu': '1m',
-                                'memory': '1G'
-                            }
+                            limits={"cpu": "1m", "memory": "1G"}
                         ),
-                        security_context=k8s.V1SecurityContext(
-                            run_as_user=1
-                        )
+                        security_context=k8s.V1SecurityContext(run_as_user=1),
                     )
                 ]
-            )
+            ),
         )
         executor_config = k8s.V1Pod(
             spec=k8s.V1PodSpec(
                 containers=[
                     k8s.V1Container(
-                        name='doesnt-override-either',
+                        name="doesnt-override-either",
                         resources=k8s.V1ResourceRequirements(
-                            limits={
-                                'cpu': '2m',
-                                'memory': '2G'
-                            }
-                        )
+                            limits={"cpu": "2m", "memory": "2G"}
+                        ),
                     )
                 ]
             )
         )
 
         result = PodGenerator.construct_pod(
-            'dag_id',
-            'task_id',
-            'pod_id',
+            "dag_id",
+            "task_id",
+            "pod_id",
             self.try_number,
             "kube_image",
             self.execution_date,
-            ['command'],
+            ["command"],
             executor_config,
             worker_config,
-            'namespace',
-            'uuid',
+            "namespace",
+            "uuid",
         )
         sanitized_result = self.k8s_client.sanitize_for_serialization(result)
 
-        self.metadata['annotations']['should'] = 'stay'
-
-        self.assertEqual({
-            'apiVersion': 'v1',
-            'kind': 'Pod',
-            'metadata': self.metadata,
-            'spec': {
-                'containers': [{
-                    'args': [],
-                    'command': ['command'],
-                    'env': [],
-                    'envFrom': [],
-                    'image': 'kube_image',
-                    'name': 'base',
-                    'ports': [],
-                    'resources': {
-                        'limits': {
-                            'cpu': '2m',
-                            'memory': '2G'
+        self.metadata["annotations"]["should"] = "stay"
+
+        self.assertEqual(
+            {
+                "apiVersion": "v1",
+                "kind": "Pod",
+                "metadata": self.metadata,
+                "spec": {
+                    "containers": [
+                        {
+                            "args": [],
+                            "command": ["command"],
+                            "env": [],
+                            "envFrom": [],
+                            "image": "kube_image",
+                            "name": "base",
+                            "ports": [],
+                            "resources": {"limits": {"cpu": "2m", "memory": "2G"}},
+                            "volumeMounts": [],
+                            "securityContext": {"runAsUser": 1},
                         }
-                    },
-                    'volumeMounts': [],
-                    'securityContext': {'runAsUser': 1}
-                }],
-                'hostNetwork': False,
-                'imagePullSecrets': [],
-                'volumes': []
-            }
-        }, sanitized_result)
+                    ],
+                    "hostNetwork": False,
+                    "imagePullSecrets": [],
+                    "volumes": [],
+                },
+            },
+            sanitized_result,
+        )
 
-    @mock.patch('uuid.uuid4')
+    @mock.patch("uuid.uuid4")
     def test_construct_pod_with_mutation(self, mock_uuid):
         mock_uuid.return_value = self.static_uuid
         worker_config = k8s.V1Pod(
             metadata=k8s.V1ObjectMeta(
-                name='gets-overridden-by-dynamic-args',
-                annotations={
-                    'should': 'stay'
-                }
+                name="gets-overridden-by-dynamic-args", annotations={"should": "stay"}
             ),
             spec=k8s.V1PodSpec(
                 containers=[
                     k8s.V1Container(
-                        name='doesnt-override',
+                        name="doesnt-override",
                         resources=k8s.V1ResourceRequirements(
-                            limits={
-                                'cpu': '1m',
-                                'memory': '1G'
-                            }
+                            limits={"cpu": "1m", "memory": "1G"}
                         ),
-                        security_context=k8s.V1SecurityContext(
-                            run_as_user=1
-                        )
+                        security_context=k8s.V1SecurityContext(run_as_user=1),
                     )
                 ]
-            )
+            ),
         )
         executor_config = k8s.V1Pod(
             spec=k8s.V1PodSpec(
                 containers=[
                     k8s.V1Container(
-                        name='doesnt-override-either',
+                        name="doesnt-override-either",
                         resources=k8s.V1ResourceRequirements(
-                            limits={
-                                'cpu': '2m',
-                                'memory': '2G'
-                            }
-                        )
+                            limits={"cpu": "2m", "memory": "2G"}
+                        ),
                     )
                 ]
             )
         )
 
         result = PodGenerator.construct_pod(
-            dag_id='dag_id',
-            task_id='task_id',
-            pod_id='pod_id',
+            dag_id="dag_id",
+            task_id="task_id",
+            pod_id="pod_id",
             try_number=3,
-            kube_image='kube_image',
+            kube_image="kube_image",
             date=self.execution_date,
-            command=['command'],
+            command=["command"],
             pod_override_object=executor_config,
             base_worker_pod=worker_config,
-            namespace='namespace',
-            worker_uuid='uuid',
+            namespace="namespace",
+            worker_uuid="uuid",
         )
         sanitized_result = self.k8s_client.sanitize_for_serialization(result)
 
-        self.metadata['annotations']['should'] = 'stay'
-
-        self.assertEqual({
-            'apiVersion': 'v1',
-            'kind': 'Pod',
-            'metadata': self.metadata,
-            'spec': {
-                'containers': [{
-                    'args': [],
-                    'command': ['command'],
-                    'env': [],
-                    'envFrom': [],
-                    'name': 'base',
-                    'image': 'kube_image',
-                    'ports': [],
-                    'resources': {
-                        'limits': {
-                            'cpu': '2m',
-                            'memory': '2G'
+        self.metadata["annotations"]["should"] = "stay"
+
+        self.assertEqual(
+            {
+                "apiVersion": "v1",
+                "kind": "Pod",
+                "metadata": self.metadata,
+                "spec": {
+                    "containers": [
+                        {
+                            "args": [],
+                            "command": ["command"],
+                            "env": [],
+                            "envFrom": [],
+                            "name": "base",
+                            "image": "kube_image",
+                            "ports": [],
+                            "resources": {"limits": {"cpu": "2m", "memory": "2G"}},
+                            "volumeMounts": [],
+                            "securityContext": {"runAsUser": 1},
                         }
-                    },
-                    'volumeMounts': [],
-                    'securityContext': {'runAsUser': 1}
-                }],
-                'hostNetwork': False,
-                'imagePullSecrets': [],
-                'volumes': []
-            }
-        }, sanitized_result)
+                    ],
+                    "hostNetwork": False,
+                    "imagePullSecrets": [],
+                    "volumes": [],
+                },
+            },
+            sanitized_result,
+        )
 
     def test_merge_objects_empty(self):
-        annotations = {'foo1': 'bar1'}
+        annotations = {"foo1": "bar1"}
         base_obj = k8s.V1ObjectMeta(annotations=annotations)
         client_obj = None
         res = merge_objects(base_obj, client_obj)
@@ -954,57 +986,54 @@ class TestPodGenerator(unittest.TestCase):
         self.assertEqual(client_obj, res)
 
     def test_merge_objects(self):
-        base_annotations = {'foo1': 'bar1'}
-        base_labels = {'foo1': 'bar1'}
-        client_annotations = {'foo2': 'bar2'}
-        base_obj = k8s.V1ObjectMeta(
-            annotations=base_annotations,
-            labels=base_labels
-        )
+        base_annotations = {"foo1": "bar1"}
+        base_labels = {"foo1": "bar1"}
+        client_annotations = {"foo2": "bar2"}
+        base_obj = k8s.V1ObjectMeta(annotations=base_annotations, labels=base_labels)
         client_obj = k8s.V1ObjectMeta(annotations=client_annotations)
         res = merge_objects(base_obj, client_obj)
         client_obj.labels = base_labels
         self.assertEqual(client_obj, res)
 
     def test_extend_object_field_empty(self):
-        ports = [k8s.V1ContainerPort(container_port=1, name='port')]
-        base_obj = k8s.V1Container(name='base_container', ports=ports)
-        client_obj = k8s.V1Container(name='client_container')
-        res = extend_object_field(base_obj, client_obj, 'ports')
+        ports = [k8s.V1ContainerPort(container_port=1, name="port")]
+        base_obj = k8s.V1Container(name="base_container", ports=ports)
+        client_obj = k8s.V1Container(name="client_container")
+        res = extend_object_field(base_obj, client_obj, "ports")
         client_obj.ports = ports
         self.assertEqual(client_obj, res)
 
-        base_obj = k8s.V1Container(name='base_container')
-        client_obj = k8s.V1Container(name='base_container', ports=ports)
-        res = extend_object_field(base_obj, client_obj, 'ports')
+        base_obj = k8s.V1Container(name="base_container")
+        client_obj = k8s.V1Container(name="base_container", ports=ports)
+        res = extend_object_field(base_obj, client_obj, "ports")
         self.assertEqual(client_obj, res)
 
     def test_extend_object_field_not_list(self):
-        base_obj = k8s.V1Container(name='base_container', image='image')
-        client_obj = k8s.V1Container(name='client_container')
+        base_obj = k8s.V1Container(name="base_container", image="image")
+        client_obj = k8s.V1Container(name="client_container")
         with self.assertRaises(ValueError):
-            extend_object_field(base_obj, client_obj, 'image')
-        base_obj = k8s.V1Container(name='base_container')
-        client_obj = k8s.V1Container(name='client_container', image='image')
+            extend_object_field(base_obj, client_obj, "image")
+        base_obj = k8s.V1Container(name="base_container")
+        client_obj = k8s.V1Container(name="client_container", image="image")
         with self.assertRaises(ValueError):
-            extend_object_field(base_obj, client_obj, 'image')
+            extend_object_field(base_obj, client_obj, "image")
 
     def test_extend_object_field(self):
-        base_ports = [k8s.V1ContainerPort(container_port=1, name='base_port')]
-        base_obj = k8s.V1Container(name='base_container', ports=base_ports)
-        client_ports = [k8s.V1ContainerPort(container_port=1, name='client_port')]
-        client_obj = k8s.V1Container(name='client_container', ports=client_ports)
-        res = extend_object_field(base_obj, client_obj, 'ports')
+        base_ports = [k8s.V1ContainerPort(container_port=1, name="base_port")]
+        base_obj = k8s.V1Container(name="base_container", ports=base_ports)
+        client_ports = [k8s.V1ContainerPort(container_port=1, name="client_port")]
+        client_obj = k8s.V1Container(name="client_container", ports=client_ports)
+        res = extend_object_field(base_obj, client_obj, "ports")
         client_obj.ports = base_ports + client_ports
         self.assertEqual(client_obj, res)
 
     def test_reconcile_containers_empty(self):
-        base_objs = [k8s.V1Container(name='base_container')]
+        base_objs = [k8s.V1Container(name="base_container")]
         client_objs = []
         res = PodGenerator.reconcile_containers(base_objs, client_objs)
         self.assertEqual(base_objs, res)
 
-        client_objs = [k8s.V1Container(name='client_container')]
+        client_objs = [k8s.V1Container(name="client_container")]
         base_objs = []
         res = PodGenerator.reconcile_containers(base_objs, client_objs)
         self.assertEqual(client_objs, res)
@@ -1013,33 +1042,33 @@ class TestPodGenerator(unittest.TestCase):
         self.assertEqual(res, [])
 
     def test_reconcile_containers(self):
-        base_ports = [k8s.V1ContainerPort(container_port=1, name='base_port')]
+        base_ports = [k8s.V1ContainerPort(container_port=1, name="base_port")]
         base_objs = [
-            k8s.V1Container(name='base_container1', ports=base_ports),
-            k8s.V1Container(name='base_container2', image='base_image'),
+            k8s.V1Container(name="base_container1", ports=base_ports),
+            k8s.V1Container(name="base_container2", image="base_image"),
         ]
-        client_ports = [k8s.V1ContainerPort(container_port=2, name='client_port')]
+        client_ports = [k8s.V1ContainerPort(container_port=2, name="client_port")]
         client_objs = [
-            k8s.V1Container(name='client_container1', ports=client_ports),
-            k8s.V1Container(name='client_container2', image='client_image'),
+            k8s.V1Container(name="client_container1", ports=client_ports),
+            k8s.V1Container(name="client_container2", image="client_image"),
         ]
         res = PodGenerator.reconcile_containers(base_objs, client_objs)
         client_objs[0].ports = base_ports + client_ports
         self.assertEqual(client_objs, res)
 
-        base_ports = [k8s.V1ContainerPort(container_port=1, name='base_port')]
+        base_ports = [k8s.V1ContainerPort(container_port=1, name="base_port")]
         base_objs = [
-            k8s.V1Container(name='base_container1', ports=base_ports),
-            k8s.V1Container(name='base_container2', image='base_image'),
+            k8s.V1Container(name="base_container1", ports=base_ports),
+            k8s.V1Container(name="base_container2", image="base_image"),
         ]
-        client_ports = [k8s.V1ContainerPort(container_port=2, name='client_port')]
+        client_ports = [k8s.V1ContainerPort(container_port=2, name="client_port")]
         client_objs = [
-            k8s.V1Container(name='client_container1', ports=client_ports),
-            k8s.V1Container(name='client_container2', stdin=True),
+            k8s.V1Container(name="client_container1", ports=client_ports),
+            k8s.V1Container(name="client_container2", stdin=True),
         ]
         res = PodGenerator.reconcile_containers(base_objs, client_objs)
         client_objs[0].ports = base_ports + client_ports
-        client_objs[1].image = 'base_image'
+        client_objs[1].image = "base_image"
         self.assertEqual(client_objs, res)
 
     def test_reconcile_specs_empty(self):
@@ -1054,17 +1083,23 @@ class TestPodGenerator(unittest.TestCase):
         self.assertEqual(client_spec, res)
 
     def test_reconcile_specs(self):
-        base_objs = [k8s.V1Container(name='base_container1', image='base_image')]
-        client_objs = [k8s.V1Container(name='client_container1')]
-        base_spec = k8s.V1PodSpec(priority=1, active_deadline_seconds=100, containers=base_objs)
-        client_spec = k8s.V1PodSpec(priority=2, hostname='local', containers=client_objs)
+        base_objs = [k8s.V1Container(name="base_container1", image="base_image")]
+        client_objs = [k8s.V1Container(name="client_container1")]
+        base_spec = k8s.V1PodSpec(
+            priority=1, active_deadline_seconds=100, containers=base_objs
+        )
+        client_spec = k8s.V1PodSpec(
+            priority=2, hostname="local", containers=client_objs
+        )
         res = PodGenerator.reconcile_specs(base_spec, client_spec)
-        client_spec.containers = [k8s.V1Container(name='client_container1', image='base_image')]
+        client_spec.containers = [
+            k8s.V1Container(name="client_container1", image="base_image")
+        ]
         client_spec.active_deadline_seconds = 100
         self.assertEqual(client_spec, res)
 
     def test_deserialize_model_file(self):
-        fixture = sys.path[0] + '/tests/kubernetes/pod.yaml'
+        fixture = sys.path[0] + "/tests/kubernetes/pod.yaml"
         result = PodGenerator.deserialize_model_file(fixture)
         sanitized_res = self.k8s_client.sanitize_for_serialization(result)
         self.assertEqual(sanitized_res, self.deserialize_result)
@@ -1107,7 +1142,11 @@ spec:
             command="test",
             pod_override_object=None,
             base_worker_pod=k8s.V1Pod(
-                metadata=k8s.V1ObjectMeta(labels={"airflow-test": "airflow-task-pod"},
-                                          annotations={"my.annotation": "foo"})))
+                metadata=k8s.V1ObjectMeta(
+                    labels={"airflow-test": "airflow-task-pod"},
+                    annotations={"my.annotation": "foo"},
+                )
+            ),
+        )
         self.assertIn("airflow-test", pod.metadata.labels)
         self.assertIn("my.annotation", pod.metadata.annotations)