You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by di...@apache.org on 2020/12/15 05:32:26 UTC
[airflow] 01/01: KubernetesExecutor overrides should only add to
lists
This is an automated email from the ASF dual-hosted git repository.
dimberman pushed a commit to branch fix-env-from-1-10
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 3a5663e43931903cee02abbea5193a6a8cb9e9e1
Author: Daniel Imberman <da...@gmail.com>
AuthorDate: Mon Dec 14 21:22:17 2020 -0800
KubernetesExecutor overrides should only add to lists
This PR makes 1.10 interaction more similar to that of Airflow 2.0.
Essentially users are able to override values that are in maps, but when
it comes to lists in k8s objects, it is too complicated to consistently
override.
---
airflow/kubernetes/pod_generator.py | 225 +++--
tests/kubernetes/test_pod_generator.py | 1453 ++++++++++++++++----------------
2 files changed, 848 insertions(+), 830 deletions(-)
diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index 4df3198..8f0d141 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -34,7 +34,6 @@ from dateutil import parser
from kubernetes.client.api_client import ApiClient
from airflow.contrib.kubernetes.pod import _extract_volume_mounts
-from airflow.exceptions import AirflowConfigException
from airflow.version import version as airflow_version
MAX_LABEL_LEN = 63
@@ -50,21 +49,15 @@ class PodDefaults(object):
def __init__(self):
pass
- XCOM_MOUNT_PATH = '/airflow/xcom'
- SIDECAR_CONTAINER_NAME = 'airflow-xcom-sidecar'
+ XCOM_MOUNT_PATH = "/airflow/xcom"
+ SIDECAR_CONTAINER_NAME = "airflow-xcom-sidecar"
XCOM_CMD = 'trap "exit 0" INT; while true; do sleep 30; done;'
- VOLUME_MOUNT = k8s.V1VolumeMount(
- name='xcom',
- mount_path=XCOM_MOUNT_PATH
- )
- VOLUME = k8s.V1Volume(
- name='xcom',
- empty_dir=k8s.V1EmptyDirVolumeSource()
- )
+ VOLUME_MOUNT = k8s.V1VolumeMount(name="xcom", mount_path=XCOM_MOUNT_PATH)
+ VOLUME = k8s.V1Volume(name="xcom", empty_dir=k8s.V1EmptyDirVolumeSource())
SIDECAR_CONTAINER = k8s.V1Container(
name=SIDECAR_CONTAINER_NAME,
- command=['sh', '-c', XCOM_CMD],
- image='alpine',
+ command=["sh", "-c", XCOM_CMD],
+ image="alpine",
volume_mounts=[VOLUME_MOUNT],
resources=k8s.V1ResourceRequirements(
requests={
@@ -88,7 +81,7 @@ def make_safe_label_value(string):
if len(safe_label) > MAX_LABEL_LEN or string != safe_label:
safe_hash = hashlib.md5(string.encode()).hexdigest()[:9]
- safe_label = safe_label[:MAX_LABEL_LEN - len(safe_hash) - 1] + "-" + safe_hash
+ safe_label = safe_label[: MAX_LABEL_LEN - len(safe_hash) - 1] + "-" + safe_hash
return safe_label
@@ -102,7 +95,7 @@ def datetime_to_label_safe_datestring(datetime_obj):
:param datetime_obj: datetime.datetime object
:return: ISO-like string representing the datetime
"""
- return datetime_obj.isoformat().replace(":", "_").replace('+', '_plus_')
+ return datetime_obj.isoformat().replace(":", "_").replace("+", "_plus_")
def label_safe_datestring_to_datetime(string):
@@ -114,7 +107,7 @@ def label_safe_datestring_to_datetime(string):
:param string: str
:return: datetime.datetime object
"""
- return parser.parse(string.replace('_plus_', '+').replace("_", ":"))
+ return parser.parse(string.replace("_plus_", "+").replace("_", ":"))
class PodGenerator(object):
@@ -230,8 +223,8 @@ class PodGenerator(object):
self.ud_pod = pod
self.pod = k8s.V1Pod()
- self.pod.api_version = 'v1'
- self.pod.kind = 'Pod'
+ self.pod.api_version = "v1"
+ self.pod.kind = "Pod"
# Pod Metadata
self.metadata = k8s.V1ObjectMeta()
@@ -241,35 +234,34 @@ class PodGenerator(object):
self.metadata.annotations = annotations
# Pod Container
- self.container = k8s.V1Container(name='base')
+ self.container = k8s.V1Container(name="base")
self.container.image = image
self.container.env = []
if envs:
if isinstance(envs, dict):
for key, val in envs.items():
- self.container.env.append(k8s.V1EnvVar(
- name=key,
- value=val
- ))
+ self.container.env.append(k8s.V1EnvVar(name=key, value=val))
elif isinstance(envs, list):
self.container.env.extend(envs)
configmaps = configmaps or []
self.container.env_from = []
for configmap in configmaps:
- self.container.env_from.append(k8s.V1EnvFromSource(
- config_map_ref=k8s.V1ConfigMapEnvSource(
- name=configmap
+ self.container.env_from.append(
+ k8s.V1EnvFromSource(
+ config_map_ref=k8s.V1ConfigMapEnvSource(name=configmap)
)
- ))
+ )
self.container.command = cmds or []
self.container.args = args or []
self.container.image_pull_policy = image_pull_policy
self.container.ports = ports or []
self.container.resources = resources
- self.container.volume_mounts = [v.to_k8s_client_obj() for v in _extract_volume_mounts(volume_mounts)]
+ self.container.volume_mounts = [
+ v.to_k8s_client_obj() for v in _extract_volume_mounts(volume_mounts)
+ ]
# Pod Spec
self.spec = k8s.V1PodSpec(containers=[])
@@ -288,10 +280,10 @@ class PodGenerator(object):
self.spec.image_pull_secrets = []
if image_pull_secrets:
- for image_pull_secret in image_pull_secrets.split(','):
- self.spec.image_pull_secrets.append(k8s.V1LocalObjectReference(
- name=image_pull_secret
- ))
+ for image_pull_secret in image_pull_secrets.split(","):
+ self.spec.image_pull_secrets.append(
+ k8s.V1LocalObjectReference(name=image_pull_secret)
+ )
# Attach sidecar
self.extract_xcom = extract_xcom
@@ -325,7 +317,7 @@ class PodGenerator(object):
return None
safe_uuid = uuid.uuid4().hex
- safe_pod_id = dag_id[:MAX_POD_ID_LEN - len(safe_uuid) - 1]
+ safe_pod_id = dag_id[: MAX_POD_ID_LEN - len(safe_uuid) - 1]
safe_pod_id = safe_pod_id + "-" + safe_uuid
return safe_pod_id
@@ -335,7 +327,9 @@ class PodGenerator(object):
pod_cp = copy.deepcopy(pod)
pod_cp.spec.volumes = pod.spec.volumes or []
pod_cp.spec.volumes.insert(0, PodDefaults.VOLUME)
- pod_cp.spec.containers[0].volume_mounts = pod_cp.spec.containers[0].volume_mounts or []
+ pod_cp.spec.containers[0].volume_mounts = (
+ pod_cp.spec.containers[0].volume_mounts or []
+ )
pod_cp.spec.containers[0].volume_mounts.insert(0, PodDefaults.VOLUME_MOUNT)
pod_cp.spec.containers.append(PodDefaults.SIDECAR_CONTAINER)
@@ -351,8 +345,9 @@ class PodGenerator(object):
if not isinstance(obj, dict):
raise TypeError(
- 'Cannot convert a non-dictionary or non-PodGenerator '
- 'object into a KubernetesExecutorConfig')
+ "Cannot convert a non-dictionary or non-PodGenerator "
+ "object into a KubernetesExecutorConfig"
+ )
# We do not want to extract constant here from ExecutorLoader because it is just
# A name in dictionary rather than executor selection mechanism and it causes cyclic import
@@ -361,50 +356,55 @@ class PodGenerator(object):
if not namespaced:
return None
- resources = namespaced.get('resources')
+ resources = namespaced.get("resources")
if resources is None:
+
def extract(cpu, memory, ephemeral_storage, limit_gpu=None):
resources_obj = {
- 'cpu': namespaced.pop(cpu, None),
- 'memory': namespaced.pop(memory, None),
- 'ephemeral-storage': namespaced.pop(ephemeral_storage, None),
+ "cpu": namespaced.pop(cpu, None),
+ "memory": namespaced.pop(memory, None),
+ "ephemeral-storage": namespaced.pop(ephemeral_storage, None),
}
if limit_gpu is not None:
- resources_obj['nvidia.com/gpu'] = namespaced.pop(limit_gpu, None)
+ resources_obj["nvidia.com/gpu"] = namespaced.pop(limit_gpu, None)
- resources_obj = {k: v for k, v in resources_obj.items() if v is not None}
+ resources_obj = {
+ k: v for k, v in resources_obj.items() if v is not None
+ }
if all(r is None for r in resources_obj):
resources_obj = None
return namespaced, resources_obj
- namespaced, requests = extract('request_cpu', 'request_memory', 'request_ephemeral_storage')
- namespaced, limits = extract('limit_cpu', 'limit_memory', 'limit_ephemeral_storage',
- limit_gpu='limit_gpu')
+ namespaced, requests = extract(
+ "request_cpu", "request_memory", "request_ephemeral_storage"
+ )
+ namespaced, limits = extract(
+ "limit_cpu",
+ "limit_memory",
+ "limit_ephemeral_storage",
+ limit_gpu="limit_gpu",
+ )
if requests is None and limits is None:
resources = None
else:
- resources = k8s.V1ResourceRequirements(
- requests=requests,
- limits=limits
- )
+ resources = k8s.V1ResourceRequirements(requests=requests, limits=limits)
elif isinstance(resources, dict):
resources = k8s.V1ResourceRequirements(
- requests=resources['requests'],
- limits=resources['limits']
+ requests=resources["requests"], limits=resources["limits"]
)
- annotations = namespaced.get('annotations', {})
- gcp_service_account_key = namespaced.get('gcp_service_account_key', None)
+ annotations = namespaced.get("annotations", {})
+ gcp_service_account_key = namespaced.get("gcp_service_account_key", None)
if annotations is not None and gcp_service_account_key is not None:
- annotations.update({
- 'iam.cloud.google.com/service-account': gcp_service_account_key
- })
+ annotations.update(
+ {"iam.cloud.google.com/service-account": gcp_service_account_key}
+ )
- namespaced['resources'] = resources
+ namespaced["resources"] = resources
return PodGenerator(**namespaced).gen_pod()
@staticmethod
@@ -426,8 +426,12 @@ class PodGenerator(object):
return base_pod
client_pod_cp = copy.deepcopy(client_pod)
- client_pod_cp.spec = PodGenerator.reconcile_specs(base_pod.spec, client_pod_cp.spec)
- client_pod_cp.metadata = PodGenerator.reconcile_metadata(base_pod.metadata, client_pod_cp.metadata)
+ client_pod_cp.spec = PodGenerator.reconcile_specs(
+ base_pod.spec, client_pod_cp.spec
+ )
+ client_pod_cp.metadata = PodGenerator.reconcile_metadata(
+ base_pod.metadata, client_pod_cp.metadata
+ )
client_pod_cp = merge_objects(base_pod, client_pod_cp)
return client_pod_cp
@@ -448,17 +452,18 @@ class PodGenerator(object):
return client_meta
elif client_meta and base_meta:
client_meta.labels = merge_objects(base_meta.labels, client_meta.labels)
- client_meta.annotations = merge_objects(base_meta.annotations, client_meta.annotations)
- extend_object_field(base_meta, client_meta, 'managed_fields')
- extend_object_field(base_meta, client_meta, 'finalizers')
- extend_object_field(base_meta, client_meta, 'owner_references')
+ client_meta.annotations = merge_objects(
+ base_meta.annotations, client_meta.annotations
+ )
+ extend_object_field(base_meta, client_meta, "managed_fields")
+ extend_object_field(base_meta, client_meta, "finalizers")
+ extend_object_field(base_meta, client_meta, "owner_references")
return merge_objects(base_meta, client_meta)
return None
@staticmethod
- def reconcile_specs(base_spec,
- client_spec):
+ def reconcile_specs(base_spec, client_spec):
"""
:param base_spec: has the base attributes which are overwritten if they exist
in the client_spec and remain if they do not exist in the client_spec
@@ -475,14 +480,13 @@ class PodGenerator(object):
client_spec.containers = PodGenerator.reconcile_containers(
base_spec.containers, client_spec.containers
)
- merged_spec = extend_object_field(base_spec, client_spec, 'volumes')
+ merged_spec = extend_object_field(base_spec, client_spec, "volumes")
return merge_objects(base_spec, merged_spec)
return None
@staticmethod
- def reconcile_containers(base_containers,
- client_containers):
+ def reconcile_containers(base_containers, client_containers):
"""
:param base_containers: has the base attributes which are overwritten if they exist
in the client_containers and remain if they do not exist in the client_containers
@@ -501,14 +505,18 @@ class PodGenerator(object):
client_container = client_containers[0]
base_container = base_containers[0]
client_container = extend_object_field(
- base_container,
- client_container,
- 'volume_mounts',
- 'mount_path')
- client_container = extend_object_field(base_container, client_container, 'env')
- client_container = extend_object_field(base_container, client_container, 'env_from')
- client_container = extend_object_field(base_container, client_container, 'ports')
- client_container = extend_object_field(base_container, client_container, 'volume_devices')
+ base_container, client_container, "volume_mounts"
+ )
+ client_container = extend_object_field(base_container, client_container, "env")
+ client_container = extend_object_field(
+ base_container, client_container, "env_from"
+ )
+ client_container = extend_object_field(
+ base_container, client_container, "ports"
+ )
+ client_container = extend_object_field(
+ base_container, client_container, "volume_devices"
+ )
client_container = merge_objects(base_container, client_container)
return [client_container] + PodGenerator.reconcile_containers(
@@ -527,7 +535,7 @@ class PodGenerator(object):
pod_override_object,
base_worker_pod,
namespace,
- worker_uuid
+ worker_uuid,
):
"""
Construct a pod by gathering and consolidating the configuration from 3 places:
@@ -545,22 +553,22 @@ class PodGenerator(object):
namespace=namespace,
image=image,
labels={
- 'airflow-worker': worker_uuid,
- 'dag_id': make_safe_label_value(dag_id),
- 'task_id': make_safe_label_value(task_id),
- 'execution_date': datetime_to_label_safe_datestring(date),
- 'try_number': str(try_number),
- 'airflow_version': airflow_version.replace('+', '-'),
- 'kubernetes_executor': 'True',
+ "airflow-worker": worker_uuid,
+ "dag_id": make_safe_label_value(dag_id),
+ "task_id": make_safe_label_value(task_id),
+ "execution_date": datetime_to_label_safe_datestring(date),
+ "try_number": str(try_number),
+ "airflow_version": airflow_version.replace("+", "-"),
+ "kubernetes_executor": "True",
},
annotations={
- 'dag_id': dag_id,
- 'task_id': task_id,
- 'execution_date': date.isoformat(),
- 'try_number': str(try_number),
+ "dag_id": dag_id,
+ "task_id": task_id,
+ "execution_date": date.isoformat(),
+ "try_number": str(try_number),
},
cmds=command,
- name=pod_id
+ name=pod_id,
).gen_pod()
# Reconcile the pods starting with the first chronologically,
@@ -627,7 +635,7 @@ def merge_objects(base_obj, client_obj):
return client_obj_cp
-def extend_object_field(base_obj, client_obj, field_name, field_to_merge="name"):
+def extend_object_field(base_obj, client_obj, field_name):
"""
:param base_obj: an object which has a property `field_name` that is a list
:param client_obj: an object which has a property `field_name` that is a list.
@@ -640,8 +648,9 @@ def extend_object_field(base_obj, client_obj, field_name, field_to_merge="name")
base_obj_field = getattr(base_obj, field_name, None)
client_obj_field = getattr(client_obj, field_name, None)
- if (not isinstance(base_obj_field, list) and base_obj_field is not None) or \
- (not isinstance(client_obj_field, list) and client_obj_field is not None):
+ if (not isinstance(base_obj_field, list) and base_obj_field is not None) or (
+ not isinstance(client_obj_field, list) and client_obj_field is not None
+ ):
raise ValueError("The chosen field must be a list.")
if not base_obj_field:
@@ -650,36 +659,6 @@ def extend_object_field(base_obj, client_obj, field_name, field_to_merge="name")
setattr(client_obj_cp, field_name, base_obj_field)
return client_obj_cp
- base_obj_set = _get_dict_from_list(base_obj_field, field_to_merge)
- client_obj_set = _get_dict_from_list(client_obj_field, field_to_merge)
-
- appended_fields = _merge_list_of_objects(base_obj_set, client_obj_set)
-
+ appended_fields = base_obj_field + client_obj_field
setattr(client_obj_cp, field_name, appended_fields)
return client_obj_cp
-
-
-def _merge_list_of_objects(base_obj_set, client_obj_set):
- for k, v in base_obj_set.items():
- if k not in client_obj_set:
- client_obj_set[k] = v
- else:
- client_obj_set[k] = merge_objects(v, client_obj_set[k])
- appended_field_keys = sorted(client_obj_set.keys())
- appended_fields = [client_obj_set[k] for k in appended_field_keys]
- return appended_fields
-
-
-def _get_dict_from_list(base_list, field_to_merge="name"):
- """
- :type base_list: list(Optional[dict, *to_dict])
- """
- result = {}
- for obj in base_list:
- if isinstance(obj, dict):
- result[obj[field_to_merge]] = obj
- elif hasattr(obj, "to_dict"):
- result[getattr(obj, field_to_merge)] = obj
- else:
- raise AirflowConfigException("Trying to merge invalid object {}".format(obj))
- return result
diff --git a/tests/kubernetes/test_pod_generator.py b/tests/kubernetes/test_pod_generator.py
index fed7c97..df7afdf 100644
--- a/tests/kubernetes/test_pod_generator.py
+++ b/tests/kubernetes/test_pod_generator.py
@@ -25,532 +25,574 @@ from kubernetes.client import ApiClient, models as k8s
from airflow.kubernetes.k8s_model import append_to_pod
from airflow.kubernetes.pod import Resources
-from airflow.kubernetes.pod_generator import PodDefaults, PodGenerator, extend_object_field, merge_objects, \
- datetime_to_label_safe_datestring
+from airflow.kubernetes.pod_generator import (
+ PodDefaults,
+ PodGenerator,
+ extend_object_field,
+ merge_objects,
+ datetime_to_label_safe_datestring,
+)
from airflow.kubernetes.secret import Secret
class TestPodGenerator(unittest.TestCase):
-
def setUp(self):
- self.static_uuid = uuid.UUID('cf4a56d2-8101-4217-b027-2af6216feb48')
+ self.static_uuid = uuid.UUID("cf4a56d2-8101-4217-b027-2af6216feb48")
self.deserialize_result = {
- 'apiVersion': 'v1',
- 'kind': 'Pod',
- 'metadata': {'name': 'memory-demo', 'namespace': 'mem-example'},
- 'spec': {
- 'containers': [{
- 'args': ['--vm', '1', '--vm-bytes', '150M', '--vm-hang', '1'],
- 'command': ['stress'],
- 'image': 'apache/airflow:stress-2020.07.10-1.0.4',
- 'name': 'memory-demo-ctr',
- 'resources': {
- 'limits': {'memory': '200Mi'},
- 'requests': {'memory': '100Mi'}
+ "apiVersion": "v1",
+ "kind": "Pod",
+ "metadata": {"name": "memory-demo", "namespace": "mem-example"},
+ "spec": {
+ "containers": [
+ {
+ "args": ["--vm", "1", "--vm-bytes", "150M", "--vm-hang", "1"],
+ "command": ["stress"],
+ "image": "apache/airflow:stress-2020.07.10-1.0.4",
+ "name": "memory-demo-ctr",
+ "resources": {
+ "limits": {"memory": "200Mi"},
+ "requests": {"memory": "100Mi"},
+ },
}
- }]
- }
+ ]
+ },
}
- self.envs = {
- 'ENVIRONMENT': 'prod',
- 'LOG_LEVEL': 'warning'
- }
+ self.envs = {"ENVIRONMENT": "prod", "LOG_LEVEL": "warning"}
self.secrets = [
# This should be a secretRef
- Secret('env', None, 'secret_a'),
+ Secret("env", None, "secret_a"),
# This should be a single secret mounted in volumeMounts
- Secret('volume', '/etc/foo', 'secret_b'),
+ Secret("volume", "/etc/foo", "secret_b"),
# This should produce a single secret mounted in env
- Secret('env', 'TARGET', 'secret_b', 'source_b'),
+ Secret("env", "TARGET", "secret_b", "source_b"),
]
- self.execution_date = parser.parse('2020-08-24 00:00:00.000000')
- self.execution_date_label = datetime_to_label_safe_datestring(self.execution_date)
- self.dag_id = 'dag_id'
- self.task_id = 'task_id'
+ self.execution_date = parser.parse("2020-08-24 00:00:00.000000")
+ self.execution_date_label = datetime_to_label_safe_datestring(
+ self.execution_date
+ )
+ self.dag_id = "dag_id"
+ self.task_id = "task_id"
self.try_number = 3
self.labels = {
- 'airflow-worker': 'uuid',
- 'dag_id': 'dag_id',
- 'execution_date': mock.ANY,
- 'task_id': 'task_id',
- 'try_number': '3',
- 'airflow_version': mock.ANY,
- 'kubernetes_executor': 'True'
+ "airflow-worker": "uuid",
+ "dag_id": "dag_id",
+ "execution_date": mock.ANY,
+ "task_id": "task_id",
+ "try_number": "3",
+ "airflow_version": mock.ANY,
+ "kubernetes_executor": "True",
}
self.metadata = {
- 'annotations': {'dag_id': 'dag_id',
- 'execution_date': '2020-08-24T00:00:00',
- 'task_id': 'task_id',
- 'try_number': '3'},
- 'labels': self.labels,
- 'name': 'pod_id-' + self.static_uuid.hex,
- 'namespace': 'namespace'
+ "annotations": {
+ "dag_id": "dag_id",
+ "execution_date": "2020-08-24T00:00:00",
+ "task_id": "task_id",
+ "try_number": "3",
+ },
+ "labels": self.labels,
+ "name": "pod_id-" + self.static_uuid.hex,
+ "namespace": "namespace",
}
- self.resources = Resources('1Gi', 1, '2Gi', '2Gi', 2, 1, '4Gi')
+ self.resources = Resources("1Gi", 1, "2Gi", "2Gi", 2, 1, "4Gi")
self.k8s_client = ApiClient()
self.expected = {
- 'apiVersion': 'v1',
- 'kind': 'Pod',
- 'metadata': {
- 'name': 'myapp-pod-' + self.static_uuid.hex,
- 'labels': {'app': 'myapp'},
- 'namespace': 'default'
+ "apiVersion": "v1",
+ "kind": "Pod",
+ "metadata": {
+ "name": "myapp-pod-" + self.static_uuid.hex,
+ "labels": {"app": "myapp"},
+ "namespace": "default",
},
- 'spec': {
- 'containers': [{
- 'name': 'base',
- 'image': 'busybox',
- 'args': [],
- 'command': [
- 'sh', '-c', 'echo Hello Kubernetes!'
- ],
- 'env': [{
- 'name': 'ENVIRONMENT',
- 'value': 'prod'
- }, {
- 'name': 'LOG_LEVEL',
- 'value': 'warning'
- }, {
- 'name': 'TARGET',
- 'valueFrom': {
- 'secretKeyRef': {
- 'name': 'secret_b',
- 'key': 'source_b'
- }
- }
- }],
- 'envFrom': [{
- 'configMapRef': {
- 'name': 'configmap_a'
- }
- }, {
- 'configMapRef': {
- 'name': 'configmap_b'
- }
- }, {
- 'secretRef': {
- 'name': 'secret_a'
- }
- }],
- 'resources': {
- 'requests': {
- 'memory': '1Gi',
- 'cpu': 1,
- 'ephemeral-storage': '2Gi'
- },
- 'limits': {
- 'memory': '2Gi',
- 'cpu': 2,
- 'nvidia.com/gpu': 1,
- 'ephemeral-storage': '4Gi'
+ "spec": {
+ "containers": [
+ {
+ "name": "base",
+ "image": "busybox",
+ "args": [],
+ "command": ["sh", "-c", "echo Hello Kubernetes!"],
+ "env": [
+ {"name": "ENVIRONMENT", "value": "prod"},
+ {"name": "LOG_LEVEL", "value": "warning"},
+ {
+ "name": "TARGET",
+ "valueFrom": {
+ "secretKeyRef": {
+ "name": "secret_b",
+ "key": "source_b",
+ }
+ },
+ },
+ ],
+ "envFrom": [
+ {"configMapRef": {"name": "configmap_a"}},
+ {"configMapRef": {"name": "configmap_b"}},
+ {"secretRef": {"name": "secret_a"}},
+ ],
+ "resources": {
+ "requests": {
+ "memory": "1Gi",
+ "cpu": 1,
+ "ephemeral-storage": "2Gi",
+ },
+ "limits": {
+ "memory": "2Gi",
+ "cpu": 2,
+ "nvidia.com/gpu": 1,
+ "ephemeral-storage": "4Gi",
+ },
},
- },
- 'ports': [{'name': 'foo', 'containerPort': 1234}],
- 'volumeMounts': [{
- 'mountPath': '/etc/foo',
- 'name': 'secretvol' + str(self.static_uuid),
- 'readOnly': True
- }]
- }],
- 'volumes': [{
- 'name': 'secretvol' + str(self.static_uuid),
- 'secret': {
- 'secretName': 'secret_b'
+ "ports": [{"name": "foo", "containerPort": 1234}],
+ "volumeMounts": [
+ {
+ "mountPath": "/etc/foo",
+ "name": "secretvol" + str(self.static_uuid),
+ "readOnly": True,
+ }
+ ],
}
- }],
- 'hostNetwork': False,
- 'imagePullSecrets': [
- {'name': 'pull_secret_a'},
- {'name': 'pull_secret_b'}
],
- 'securityContext': {
- 'runAsUser': 1000,
- 'fsGroup': 2000,
+ "volumes": [
+ {
+ "name": "secretvol" + str(self.static_uuid),
+ "secret": {"secretName": "secret_b"},
+ }
+ ],
+ "hostNetwork": False,
+ "imagePullSecrets": [
+ {"name": "pull_secret_a"},
+ {"name": "pull_secret_b"},
+ ],
+ "securityContext": {
+ "runAsUser": 1000,
+ "fsGroup": 2000,
},
- }
+ },
}
- @mock.patch('uuid.uuid4')
+ @mock.patch("uuid.uuid4")
def test_gen_pod(self, mock_uuid):
mock_uuid.return_value = self.static_uuid
pod_generator = PodGenerator(
- labels={'app': 'myapp'},
- name='myapp-pod',
- image_pull_secrets='pull_secret_a,pull_secret_b',
- image='busybox',
+ labels={"app": "myapp"},
+ name="myapp-pod",
+ image_pull_secrets="pull_secret_a,pull_secret_b",
+ image="busybox",
envs=self.envs,
- cmds=['sh', '-c', 'echo Hello Kubernetes!'],
+ cmds=["sh", "-c", "echo Hello Kubernetes!"],
security_context=k8s.V1PodSecurityContext(
run_as_user=1000,
fs_group=2000,
),
- namespace='default',
- ports=[k8s.V1ContainerPort(name='foo', container_port=1234)],
- configmaps=['configmap_a', 'configmap_b']
+ namespace="default",
+ ports=[k8s.V1ContainerPort(name="foo", container_port=1234)],
+ configmaps=["configmap_a", "configmap_b"],
)
result = pod_generator.gen_pod()
result = append_to_pod(result, self.secrets)
result = self.resources.attach_to_pod(result)
result_dict = self.k8s_client.sanitize_for_serialization(result)
# sort
- result_dict['spec']['containers'][0]['env'].sort(key=lambda x: x['name'])
- result_dict['spec']['containers'][0]['envFrom'].sort(
- key=lambda x: list(x.values())[0]['name']
+ result_dict["spec"]["containers"][0]["env"].sort(key=lambda x: x["name"])
+ result_dict["spec"]["containers"][0]["envFrom"].sort(
+ key=lambda x: list(x.values())[0]["name"]
)
self.assertDictEqual(self.expected, result_dict)
- @mock.patch('uuid.uuid4')
+ @mock.patch("uuid.uuid4")
def test_gen_pod_extract_xcom(self, mock_uuid):
mock_uuid.return_value = self.static_uuid
pod_generator = PodGenerator(
- labels={'app': 'myapp'},
- name='myapp-pod',
- image_pull_secrets='pull_secret_a,pull_secret_b',
- image='busybox',
+ labels={"app": "myapp"},
+ name="myapp-pod",
+ image_pull_secrets="pull_secret_a,pull_secret_b",
+ image="busybox",
envs=self.envs,
- cmds=['sh', '-c', 'echo Hello Kubernetes!'],
- namespace='default',
+ cmds=["sh", "-c", "echo Hello Kubernetes!"],
+ namespace="default",
security_context=k8s.V1PodSecurityContext(
run_as_user=1000,
fs_group=2000,
),
- ports=[k8s.V1ContainerPort(name='foo', container_port=1234)],
- configmaps=['configmap_a', 'configmap_b'],
- extract_xcom=True
+ ports=[k8s.V1ContainerPort(name="foo", container_port=1234)],
+ configmaps=["configmap_a", "configmap_b"],
+ extract_xcom=True,
)
result = pod_generator.gen_pod()
result = append_to_pod(result, self.secrets)
result = self.resources.attach_to_pod(result)
result_dict = self.k8s_client.sanitize_for_serialization(result)
container_two = {
- 'name': 'airflow-xcom-sidecar',
- 'image': "alpine",
- 'command': ['sh', '-c', PodDefaults.XCOM_CMD],
- 'volumeMounts': [
- {
- 'name': 'xcom',
- 'mountPath': '/airflow/xcom'
- }
- ],
- 'resources': {'requests': {'cpu': '1m'}},
+ "name": "airflow-xcom-sidecar",
+ "image": "alpine",
+ "command": ["sh", "-c", PodDefaults.XCOM_CMD],
+ "volumeMounts": [{"name": "xcom", "mountPath": "/airflow/xcom"}],
+ "resources": {"requests": {"cpu": "1m"}},
}
- self.expected['spec']['containers'].append(container_two)
- self.expected['spec']['containers'][0]['volumeMounts'].insert(0, {
- 'name': 'xcom',
- 'mountPath': '/airflow/xcom'
- })
- self.expected['spec']['volumes'].insert(0, {
- 'name': 'xcom', 'emptyDir': {}
- })
- result_dict['spec']['containers'][0]['env'].sort(key=lambda x: x['name'])
+ self.expected["spec"]["containers"].append(container_two)
+ self.expected["spec"]["containers"][0]["volumeMounts"].insert(
+ 0, {"name": "xcom", "mountPath": "/airflow/xcom"}
+ )
+ self.expected["spec"]["volumes"].insert(0, {"name": "xcom", "emptyDir": {}})
+ result_dict["spec"]["containers"][0]["env"].sort(key=lambda x: x["name"])
self.assertEqual(result_dict, self.expected)
- @mock.patch('uuid.uuid4')
+ @mock.patch("uuid.uuid4")
def test_from_obj(self, mock_uuid):
mock_uuid.return_value = self.static_uuid
- result = PodGenerator.from_obj({
- "KubernetesExecutor": {
- "annotations": {"test": "annotation"},
- "volumes": [
- {
- "name": "example-kubernetes-test-volume",
- "hostPath": {"path": "/tmp/"},
- },
- ],
- "volume_mounts": [
- {
- "mountPath": "/foo/",
- "name": "example-kubernetes-test-volume",
- },
- ],
- "resources": {
- "requests": {
- "memory": "256Mi",
- "cpu": "500m",
- "ephemeral-storage": "2G",
- "nvidia.com/gpu": "0"
+ result = PodGenerator.from_obj(
+ {
+ "KubernetesExecutor": {
+ "annotations": {"test": "annotation"},
+ "volumes": [
+ {
+ "name": "example-kubernetes-test-volume",
+ "hostPath": {"path": "/tmp/"},
+ },
+ ],
+ "volume_mounts": [
+ {
+ "mountPath": "/foo/",
+ "name": "example-kubernetes-test-volume",
+ },
+ ],
+ "resources": {
+ "requests": {
+ "memory": "256Mi",
+ "cpu": "500m",
+ "ephemeral-storage": "2G",
+ "nvidia.com/gpu": "0",
+ },
+ "limits": {
+ "memory": "512Mi",
+ "cpu": "1000m",
+ "ephemeral-storage": "2G",
+ "nvidia.com/gpu": "0",
+ },
},
- "limits": {
- "memory": "512Mi",
- "cpu": "1000m",
- "ephemeral-storage": "2G",
- "nvidia.com/gpu": "0"
- }
}
}
- })
+ )
result = self.k8s_client.sanitize_for_serialization(result)
- self.assertEqual({
- 'apiVersion': 'v1',
- 'kind': 'Pod',
- 'metadata': {
- 'annotations': {'test': 'annotation'},
+ self.assertEqual(
+ {
+ "apiVersion": "v1",
+ "kind": "Pod",
+ "metadata": {
+ "annotations": {"test": "annotation"},
+ },
+ "spec": {
+ "containers": [
+ {
+ "args": [],
+ "command": [],
+ "env": [],
+ "envFrom": [],
+ "name": "base",
+ "ports": [],
+ "volumeMounts": [
+ {
+ "mountPath": "/foo/",
+ "name": "example-kubernetes-test-volume",
+ }
+ ],
+ "resources": {
+ "requests": {
+ "memory": "256Mi",
+ "cpu": "500m",
+ "ephemeral-storage": "2G",
+ "nvidia.com/gpu": "0",
+ },
+ "limits": {
+ "memory": "512Mi",
+ "cpu": "1000m",
+ "ephemeral-storage": "2G",
+ "nvidia.com/gpu": "0",
+ },
+ },
+ }
+ ],
+ "hostNetwork": False,
+ "imagePullSecrets": [],
+ "volumes": [
+ {
+ "hostPath": {"path": "/tmp/"},
+ "name": "example-kubernetes-test-volume",
+ }
+ ],
+ },
},
- 'spec': {
- 'containers': [{
- 'args': [],
- 'command': [],
- 'env': [],
- 'envFrom': [],
- 'name': 'base',
- 'ports': [],
- 'volumeMounts': [{
- 'mountPath': '/foo/',
- 'name': 'example-kubernetes-test-volume'
- }],
+ result,
+ )
+
+ @mock.patch("uuid.uuid4")
+ def test_from_obj_with_resources_object(self, mock_uuid):
+ mock_uuid.return_value = self.static_uuid
+ result = PodGenerator.from_obj(
+ {
+ "KubernetesExecutor": {
+ "annotations": {"test": "annotation"},
+ "volumes": [
+ {
+ "name": "example-kubernetes-test-volume",
+ "hostPath": {"path": "/tmp/"},
+ },
+ ],
+ "volume_mounts": [
+ {
+ "mountPath": "/foo/",
+ "name": "example-kubernetes-test-volume",
+ },
+ ],
"resources": {
"requests": {
"memory": "256Mi",
"cpu": "500m",
"ephemeral-storage": "2G",
- "nvidia.com/gpu": "0"
+ "nvidia.com/gpu": "0",
},
"limits": {
"memory": "512Mi",
"cpu": "1000m",
"ephemeral-storage": "2G",
- "nvidia.com/gpu": "0"
- }
- }
- }],
- 'hostNetwork': False,
- 'imagePullSecrets': [],
- 'volumes': [{
- 'hostPath': {'path': '/tmp/'},
- 'name': 'example-kubernetes-test-volume'
- }],
- }
- }, result)
-
- @mock.patch('uuid.uuid4')
- def test_from_obj_with_resources_object(self, mock_uuid):
- mock_uuid.return_value = self.static_uuid
- result = PodGenerator.from_obj({
- "KubernetesExecutor": {
- "annotations": {"test": "annotation"},
- "volumes": [
- {
- "name": "example-kubernetes-test-volume",
- "hostPath": {"path": "/tmp/"},
- },
- ],
- "volume_mounts": [
- {
- "mountPath": "/foo/",
- "name": "example-kubernetes-test-volume",
- },
- ],
- "resources": {
- "requests": {
- "memory": "256Mi",
- "cpu": "500m",
- "ephemeral-storage": "2G",
- "nvidia.com/gpu": "0"
+ "nvidia.com/gpu": "0",
+ },
},
- "limits": {
- "memory": "512Mi",
- "cpu": "1000m",
- "ephemeral-storage": "2G",
- "nvidia.com/gpu": "0"
- }
}
}
- })
+ )
result = self.k8s_client.sanitize_for_serialization(result)
- self.assertEqual({
- 'apiVersion': 'v1',
- 'kind': 'Pod',
- 'metadata': {
- 'annotations': {'test': 'annotation'},
+ self.assertEqual(
+ {
+ "apiVersion": "v1",
+ "kind": "Pod",
+ "metadata": {
+ "annotations": {"test": "annotation"},
+ },
+ "spec": {
+ "containers": [
+ {
+ "args": [],
+ "command": [],
+ "env": [],
+ "envFrom": [],
+ "name": "base",
+ "ports": [],
+ "volumeMounts": [
+ {
+ "mountPath": "/foo/",
+ "name": "example-kubernetes-test-volume",
+ }
+ ],
+ "resources": {
+ "limits": {
+ "cpu": "1000m",
+ "ephemeral-storage": "2G",
+ "memory": "512Mi",
+ "nvidia.com/gpu": "0",
+ },
+ "requests": {
+ "cpu": "500m",
+ "ephemeral-storage": "2G",
+ "memory": "256Mi",
+ "nvidia.com/gpu": "0",
+ },
+ },
+ }
+ ],
+ "hostNetwork": False,
+ "imagePullSecrets": [],
+ "volumes": [
+ {
+ "hostPath": {"path": "/tmp/"},
+ "name": "example-kubernetes-test-volume",
+ }
+ ],
+ },
},
- 'spec': {
- 'containers': [{
- 'args': [],
- 'command': [],
- 'env': [],
- 'envFrom': [],
- 'name': 'base',
- 'ports': [],
- 'volumeMounts': [{
- 'mountPath': '/foo/',
- 'name': 'example-kubernetes-test-volume'
- }],
- 'resources': {'limits': {'cpu': '1000m',
- 'ephemeral-storage': '2G',
- 'memory': '512Mi',
- 'nvidia.com/gpu': '0'},
- 'requests': {'cpu': '500m',
- 'ephemeral-storage': '2G',
- 'memory': '256Mi',
- 'nvidia.com/gpu': '0'}},
- }],
- 'hostNetwork': False,
- 'imagePullSecrets': [],
- 'volumes': [{
- 'hostPath': {'path': '/tmp/'},
- 'name': 'example-kubernetes-test-volume'
- }],
- }
- }, result)
+ result,
+ )
- @mock.patch('uuid.uuid4')
+ @mock.patch("uuid.uuid4")
def test_from_obj_with_resources(self, mock_uuid):
self.maxDiff = None
mock_uuid.return_value = self.static_uuid
- result = PodGenerator.from_obj({
- "KubernetesExecutor": {
- "annotations": {"test": "annotation"},
- "volumes": [
- {
- "name": "example-kubernetes-test-volume",
- "hostPath": {"path": "/tmp/"},
- },
- ],
- "volume_mounts": [
- {
- "mountPath": "/foo/",
- "name": "example-kubernetes-test-volume",
- },
- ],
- 'request_cpu': "200m",
- 'limit_cpu': "400m",
- 'request_memory': "500Mi",
- 'limit_memory': "1000Mi",
- 'limit_gpu': "2",
- 'request_ephemeral_storage': '2Gi',
- 'limit_ephemeral_storage': '4Gi',
+ result = PodGenerator.from_obj(
+ {
+ "KubernetesExecutor": {
+ "annotations": {"test": "annotation"},
+ "volumes": [
+ {
+ "name": "example-kubernetes-test-volume",
+ "hostPath": {"path": "/tmp/"},
+ },
+ ],
+ "volume_mounts": [
+ {
+ "mountPath": "/foo/",
+ "name": "example-kubernetes-test-volume",
+ },
+ ],
+ "request_cpu": "200m",
+ "limit_cpu": "400m",
+ "request_memory": "500Mi",
+ "limit_memory": "1000Mi",
+ "limit_gpu": "2",
+ "request_ephemeral_storage": "2Gi",
+ "limit_ephemeral_storage": "4Gi",
+ }
}
- })
+ )
result = self.k8s_client.sanitize_for_serialization(result)
- self.assertEqual({
- 'apiVersion': 'v1',
- 'kind': 'Pod',
- 'metadata': {
- 'annotations': {'test': 'annotation'},
+ self.assertEqual(
+ {
+ "apiVersion": "v1",
+ "kind": "Pod",
+ "metadata": {
+ "annotations": {"test": "annotation"},
+ },
+ "spec": {
+ "containers": [
+ {
+ "args": [],
+ "command": [],
+ "env": [],
+ "envFrom": [],
+ "name": "base",
+ "ports": [],
+ "resources": {
+ "limits": {
+ "cpu": "400m",
+ "ephemeral-storage": "4Gi",
+ "memory": "1000Mi",
+ "nvidia.com/gpu": "2",
+ },
+ "requests": {
+ "cpu": "200m",
+ "ephemeral-storage": "2Gi",
+ "memory": "500Mi",
+ },
+ },
+ "volumeMounts": [
+ {
+ "mountPath": "/foo/",
+ "name": "example-kubernetes-test-volume",
+ }
+ ],
+ }
+ ],
+ "hostNetwork": False,
+ "imagePullSecrets": [],
+ "volumes": [
+ {
+ "hostPath": {"path": "/tmp/"},
+ "name": "example-kubernetes-test-volume",
+ }
+ ],
+ },
},
- 'spec': {
- 'containers': [{
- 'args': [],
- 'command': [],
- 'env': [],
- 'envFrom': [],
- 'name': 'base',
- 'ports': [],
- 'resources': {
- 'limits': {
- 'cpu': '400m',
- 'ephemeral-storage': '4Gi',
- 'memory': '1000Mi',
- 'nvidia.com/gpu': "2",
- },
- 'requests': {
- 'cpu': '200m',
- 'ephemeral-storage': '2Gi',
- 'memory': '500Mi',
- },
- },
- 'volumeMounts': [{
- 'mountPath': '/foo/',
- 'name': 'example-kubernetes-test-volume'
- }],
- }],
- 'hostNetwork': False,
- 'imagePullSecrets': [],
- 'volumes': [{
- 'hostPath': {'path': '/tmp/'},
- 'name': 'example-kubernetes-test-volume'
- }],
- }
- }, result)
+ result,
+ )
- @mock.patch('uuid.uuid4')
+ @mock.patch("uuid.uuid4")
def test_from_obj_with_only_request_resources(self, mock_uuid):
self.maxDiff = None
mock_uuid.return_value = self.static_uuid
- result = PodGenerator.from_obj({
- "KubernetesExecutor": {
- "annotations": {"test": "annotation"},
- "volumes": [
- {
- "name": "example-kubernetes-test-volume",
- "hostPath": {"path": "/tmp/"},
- },
- ],
- "volume_mounts": [
- {
- "mountPath": "/foo/",
- "name": "example-kubernetes-test-volume",
- },
- ],
- 'request_cpu': "200m",
- 'request_memory': "500Mi",
+ result = PodGenerator.from_obj(
+ {
+ "KubernetesExecutor": {
+ "annotations": {"test": "annotation"},
+ "volumes": [
+ {
+ "name": "example-kubernetes-test-volume",
+ "hostPath": {"path": "/tmp/"},
+ },
+ ],
+ "volume_mounts": [
+ {
+ "mountPath": "/foo/",
+ "name": "example-kubernetes-test-volume",
+ },
+ ],
+ "request_cpu": "200m",
+ "request_memory": "500Mi",
+ }
}
- })
+ )
result = self.k8s_client.sanitize_for_serialization(result)
- self.assertEqual({
- 'apiVersion': 'v1',
- 'kind': 'Pod',
- 'metadata': {
- 'annotations': {'test': 'annotation'},
+ self.assertEqual(
+ {
+ "apiVersion": "v1",
+ "kind": "Pod",
+ "metadata": {
+ "annotations": {"test": "annotation"},
+ },
+ "spec": {
+ "containers": [
+ {
+ "args": [],
+ "command": [],
+ "env": [],
+ "envFrom": [],
+ "name": "base",
+ "ports": [],
+ "resources": {
+ "requests": {
+ "cpu": "200m",
+ "memory": "500Mi",
+ },
+ },
+ "volumeMounts": [
+ {
+ "mountPath": "/foo/",
+ "name": "example-kubernetes-test-volume",
+ }
+ ],
+ }
+ ],
+ "hostNetwork": False,
+ "imagePullSecrets": [],
+ "volumes": [
+ {
+ "hostPath": {"path": "/tmp/"},
+ "name": "example-kubernetes-test-volume",
+ }
+ ],
+ },
},
- 'spec': {
- 'containers': [{
- 'args': [],
- 'command': [],
- 'env': [],
- 'envFrom': [],
- 'name': 'base',
- 'ports': [],
- 'resources': {
- 'requests': {
- 'cpu': '200m',
- 'memory': '500Mi',
- },
- },
- 'volumeMounts': [{
- 'mountPath': '/foo/',
- 'name': 'example-kubernetes-test-volume'
- }],
- }],
- 'hostNetwork': False,
- 'imagePullSecrets': [],
- 'volumes': [{
- 'hostPath': {'path': '/tmp/'},
- 'name': 'example-kubernetes-test-volume'
- }],
- }
- }, result)
+ result,
+ )
- @mock.patch('uuid.uuid4')
+ @mock.patch("uuid.uuid4")
def test_reconcile_pods_empty_mutator_pod(self, mock_uuid):
mock_uuid.return_value = self.static_uuid
base_pod = PodGenerator(
- image='image1',
- name='name1',
- envs={'key1': 'val1'},
- cmds=['/bin/command1.sh', 'arg1'],
- ports=[k8s.V1ContainerPort(name='port', container_port=2118)],
- volumes=[{
- 'hostPath': {'path': '/tmp/'},
- 'name': 'example-kubernetes-test-volume1'
- }],
- volume_mounts=[{
- 'mountPath': '/foo/',
- 'name': 'example-kubernetes-test-volume1'
- }],
+ image="image1",
+ name="name1",
+ envs={"key1": "val1"},
+ cmds=["/bin/command1.sh", "arg1"],
+ ports=[k8s.V1ContainerPort(name="port", container_port=2118)],
+ volumes=[
+ {
+ "hostPath": {"path": "/tmp/"},
+ "name": "example-kubernetes-test-volume1",
+ }
+ ],
+ volume_mounts=[
+ {"mountPath": "/foo/", "name": "example-kubernetes-test-volume1"}
+ ],
).gen_pod()
mutator_pod = None
- name = 'name1-' + self.static_uuid.hex
+ name = "name1-" + self.static_uuid.hex
base_pod.metadata.name = name
@@ -561,92 +603,107 @@ class TestPodGenerator(unittest.TestCase):
result = PodGenerator.reconcile_pods(base_pod, mutator_pod)
self.assertEqual(base_pod, result)
- @mock.patch('uuid.uuid4')
+ @mock.patch("uuid.uuid4")
def test_reconcile_pods(self, mock_uuid):
mock_uuid.return_value = self.static_uuid
base_pod = PodGenerator(
- image='image1',
- name='name1',
- envs={'key1': 'val1'},
- cmds=['/bin/command1.sh', 'arg1'],
- ports=[k8s.V1ContainerPort(name='port', container_port=2118)],
- volumes=[{
- 'hostPath': {'path': '/tmp/'},
- 'name': 'example-kubernetes-test-volume1'
- }],
- volume_mounts=[{
- 'mountPath': '/foo/',
- 'name': 'example-kubernetes-test-volume1'
- }],
+ image="image1",
+ name="name1",
+ envs={"key1": "val1"},
+ cmds=["/bin/command1.sh", "arg1"],
+ ports=[k8s.V1ContainerPort(name="port", container_port=2118)],
+ volumes=[
+ {
+ "hostPath": {"path": "/tmp/"},
+ "name": "example-kubernetes-test-volume1",
+ }
+ ],
+ volume_mounts=[
+ {"mountPath": "/foo/", "name": "example-kubernetes-test-volume1"}
+ ],
).gen_pod()
mutator_pod = PodGenerator(
- envs={'key2': 'val2'},
- image='',
- name='name2',
- cmds=['/bin/command2.sh', 'arg2'],
- volumes=[{
- 'hostPath': {'path': '/tmp/'},
- 'name': 'example-kubernetes-test-volume2'
- }],
- volume_mounts=[{
- 'mountPath': '/foo/',
- 'name': 'example-kubernetes-test-volume2'
- }]
+ envs={"key2": "val2"},
+ image="",
+ name="name2",
+ cmds=["/bin/command2.sh", "arg2"],
+ volumes=[
+ {
+ "hostPath": {"path": "/tmp/"},
+ "name": "example-kubernetes-test-volume2",
+ }
+ ],
+ volume_mounts=[
+ {"mountPath": "/foo/", "name": "example-kubernetes-test-volume2"}
+ ],
).gen_pod()
result = PodGenerator.reconcile_pods(base_pod, mutator_pod)
result = self.k8s_client.sanitize_for_serialization(result)
- self.assertEqual({
- 'apiVersion': 'v1',
- 'kind': 'Pod',
- 'metadata': {'name': 'name2-' + self.static_uuid.hex},
- 'spec': {
- 'containers': [{
- 'args': [],
- 'command': ['/bin/command2.sh', 'arg2'],
- 'env': [
- {'name': 'key1', 'value': 'val1'},
- {'name': 'key2', 'value': 'val2'}
+ self.assertEqual(
+ {
+ "apiVersion": "v1",
+ "kind": "Pod",
+ "metadata": {"name": "name2-" + self.static_uuid.hex},
+ "spec": {
+ "containers": [
+ {
+ "args": [],
+ "command": ["/bin/command2.sh", "arg2"],
+ "env": [
+ {"name": "key1", "value": "val1"},
+ {"name": "key2", "value": "val2"},
+ ],
+ "envFrom": [],
+ "image": "image1",
+ "name": "base",
+ "ports": [
+ {
+ "containerPort": 2118,
+ "name": "port",
+ }
+ ],
+ "volumeMounts": [
+ {
+ "mountPath": "/foo/",
+ "name": "example-kubernetes-test-volume1",
+ },
+ {
+ "mountPath": "/foo/",
+ "name": "example-kubernetes-test-volume2",
+ },
+ ],
+ }
],
- 'envFrom': [],
- 'image': 'image1',
- 'name': 'base',
- 'ports': [{
- 'containerPort': 2118,
- 'name': 'port',
- }],
- 'volumeMounts': [{
- 'mountPath': '/foo/',
- 'name': 'example-kubernetes-test-volume2'
- }]
- }],
- 'hostNetwork': False,
- 'imagePullSecrets': [],
- 'volumes': [{
- 'hostPath': {'path': '/tmp/'},
- 'name': 'example-kubernetes-test-volume1'
- }, {
- 'hostPath': {'path': '/tmp/'},
- 'name': 'example-kubernetes-test-volume2'
- }]
- }
- }, result)
+ "hostNetwork": False,
+ "imagePullSecrets": [],
+ "volumes": [
+ {
+ "hostPath": {"path": "/tmp/"},
+ "name": "example-kubernetes-test-volume1",
+ },
+ {
+ "hostPath": {"path": "/tmp/"},
+ "name": "example-kubernetes-test-volume2",
+ },
+ ],
+ },
+ },
+ result,
+ )
- @mock.patch('uuid.uuid4')
+ @mock.patch("uuid.uuid4")
def test_construct_pod_empty_worker_config(self, mock_uuid):
mock_uuid.return_value = self.static_uuid
executor_config = k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
- name='',
+ name="",
resources=k8s.V1ResourceRequirements(
- limits={
- 'cpu': '1m',
- 'memory': '1G'
- }
- )
+ limits={"cpu": "1m", "memory": "1G"}
+ ),
)
]
)
@@ -654,61 +711,58 @@ class TestPodGenerator(unittest.TestCase):
worker_config = k8s.V1Pod()
result = PodGenerator.construct_pod(
- 'dag_id',
- 'task_id',
- 'pod_id',
+ "dag_id",
+ "task_id",
+ "pod_id",
self.try_number,
"kube_image",
self.execution_date,
- ['command'],
+ ["command"],
executor_config,
worker_config,
- 'namespace',
- 'uuid',
+ "namespace",
+ "uuid",
)
sanitized_result = self.k8s_client.sanitize_for_serialization(result)
- self.assertEqual({
- 'apiVersion': 'v1',
- 'kind': 'Pod',
- 'metadata': self.metadata,
- 'spec': {
- 'containers': [{
- 'args': [],
- 'command': ['command'],
- 'env': [],
- 'envFrom': [],
- 'name': 'base',
- 'image': 'kube_image',
- 'ports': [],
- 'resources': {
- 'limits': {
- 'cpu': '1m',
- 'memory': '1G'
+ self.assertEqual(
+ {
+ "apiVersion": "v1",
+ "kind": "Pod",
+ "metadata": self.metadata,
+ "spec": {
+ "containers": [
+ {
+ "args": [],
+ "command": ["command"],
+ "env": [],
+ "envFrom": [],
+ "name": "base",
+ "image": "kube_image",
+ "ports": [],
+ "resources": {"limits": {"cpu": "1m", "memory": "1G"}},
+ "volumeMounts": [],
}
- },
- 'volumeMounts': []
- }],
- 'hostNetwork': False,
- 'imagePullSecrets': [],
- 'volumes': []
- }
- }, sanitized_result)
+ ],
+ "hostNetwork": False,
+ "imagePullSecrets": [],
+ "volumes": [],
+ },
+ },
+ sanitized_result,
+ )
- @mock.patch('uuid.uuid4')
+ @mock.patch("uuid.uuid4")
def test_construct_pod_empty_executor_config(self, mock_uuid):
mock_uuid.return_value = self.static_uuid
worker_config = k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
- name='',
+ name="",
resources=k8s.V1ResourceRequirements(
- limits={
- 'cpu': '1m',
- 'memory': '1G'
- }
- )
+ limits={"cpu": "1m", "memory": "1G"}
+ ),
)
]
)
@@ -716,225 +770,203 @@ class TestPodGenerator(unittest.TestCase):
executor_config = None
result = PodGenerator.construct_pod(
- 'dag_id',
- 'task_id',
- 'pod_id',
+ "dag_id",
+ "task_id",
+ "pod_id",
self.try_number,
"kube_image",
self.execution_date,
- ['command'],
+ ["command"],
executor_config,
worker_config,
- 'namespace',
- 'uuid',
+ "namespace",
+ "uuid",
)
sanitized_result = self.k8s_client.sanitize_for_serialization(result)
- self.assertEqual({
- 'apiVersion': 'v1',
- 'kind': 'Pod',
- 'metadata': self.metadata,
- 'spec': {
- 'containers': [{
- 'args': [],
- 'command': ['command'],
- 'env': [],
- 'envFrom': [],
- 'name': 'base',
- 'image': 'kube_image',
- 'ports': [],
- 'resources': {
- 'limits': {
- 'cpu': '1m',
- 'memory': '1G'
+ self.assertEqual(
+ {
+ "apiVersion": "v1",
+ "kind": "Pod",
+ "metadata": self.metadata,
+ "spec": {
+ "containers": [
+ {
+ "args": [],
+ "command": ["command"],
+ "env": [],
+ "envFrom": [],
+ "name": "base",
+ "image": "kube_image",
+ "ports": [],
+ "resources": {"limits": {"cpu": "1m", "memory": "1G"}},
+ "volumeMounts": [],
}
- },
- 'volumeMounts': []
- }],
- 'hostNetwork': False,
- 'imagePullSecrets': [],
- 'volumes': []
- }
- }, sanitized_result)
+ ],
+ "hostNetwork": False,
+ "imagePullSecrets": [],
+ "volumes": [],
+ },
+ },
+ sanitized_result,
+ )
- @mock.patch('uuid.uuid4')
+ @mock.patch("uuid.uuid4")
def test_construct_pod(self, mock_uuid):
mock_uuid.return_value = self.static_uuid
worker_config = k8s.V1Pod(
metadata=k8s.V1ObjectMeta(
- name='gets-overridden-by-dynamic-args',
- annotations={
- 'should': 'stay'
- }
+ name="gets-overridden-by-dynamic-args", annotations={"should": "stay"}
),
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
- name='doesnt-override',
+ name="doesnt-override",
resources=k8s.V1ResourceRequirements(
- limits={
- 'cpu': '1m',
- 'memory': '1G'
- }
+ limits={"cpu": "1m", "memory": "1G"}
),
- security_context=k8s.V1SecurityContext(
- run_as_user=1
- )
+ security_context=k8s.V1SecurityContext(run_as_user=1),
)
]
- )
+ ),
)
executor_config = k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
- name='doesnt-override-either',
+ name="doesnt-override-either",
resources=k8s.V1ResourceRequirements(
- limits={
- 'cpu': '2m',
- 'memory': '2G'
- }
- )
+ limits={"cpu": "2m", "memory": "2G"}
+ ),
)
]
)
)
result = PodGenerator.construct_pod(
- 'dag_id',
- 'task_id',
- 'pod_id',
+ "dag_id",
+ "task_id",
+ "pod_id",
self.try_number,
"kube_image",
self.execution_date,
- ['command'],
+ ["command"],
executor_config,
worker_config,
- 'namespace',
- 'uuid',
+ "namespace",
+ "uuid",
)
sanitized_result = self.k8s_client.sanitize_for_serialization(result)
- self.metadata['annotations']['should'] = 'stay'
-
- self.assertEqual({
- 'apiVersion': 'v1',
- 'kind': 'Pod',
- 'metadata': self.metadata,
- 'spec': {
- 'containers': [{
- 'args': [],
- 'command': ['command'],
- 'env': [],
- 'envFrom': [],
- 'image': 'kube_image',
- 'name': 'base',
- 'ports': [],
- 'resources': {
- 'limits': {
- 'cpu': '2m',
- 'memory': '2G'
+ self.metadata["annotations"]["should"] = "stay"
+
+ self.assertEqual(
+ {
+ "apiVersion": "v1",
+ "kind": "Pod",
+ "metadata": self.metadata,
+ "spec": {
+ "containers": [
+ {
+ "args": [],
+ "command": ["command"],
+ "env": [],
+ "envFrom": [],
+ "image": "kube_image",
+ "name": "base",
+ "ports": [],
+ "resources": {"limits": {"cpu": "2m", "memory": "2G"}},
+ "volumeMounts": [],
+ "securityContext": {"runAsUser": 1},
}
- },
- 'volumeMounts': [],
- 'securityContext': {'runAsUser': 1}
- }],
- 'hostNetwork': False,
- 'imagePullSecrets': [],
- 'volumes': []
- }
- }, sanitized_result)
+ ],
+ "hostNetwork": False,
+ "imagePullSecrets": [],
+ "volumes": [],
+ },
+ },
+ sanitized_result,
+ )
- @mock.patch('uuid.uuid4')
+ @mock.patch("uuid.uuid4")
def test_construct_pod_with_mutation(self, mock_uuid):
mock_uuid.return_value = self.static_uuid
worker_config = k8s.V1Pod(
metadata=k8s.V1ObjectMeta(
- name='gets-overridden-by-dynamic-args',
- annotations={
- 'should': 'stay'
- }
+ name="gets-overridden-by-dynamic-args", annotations={"should": "stay"}
),
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
- name='doesnt-override',
+ name="doesnt-override",
resources=k8s.V1ResourceRequirements(
- limits={
- 'cpu': '1m',
- 'memory': '1G'
- }
+ limits={"cpu": "1m", "memory": "1G"}
),
- security_context=k8s.V1SecurityContext(
- run_as_user=1
- )
+ security_context=k8s.V1SecurityContext(run_as_user=1),
)
]
- )
+ ),
)
executor_config = k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
- name='doesnt-override-either',
+ name="doesnt-override-either",
resources=k8s.V1ResourceRequirements(
- limits={
- 'cpu': '2m',
- 'memory': '2G'
- }
- )
+ limits={"cpu": "2m", "memory": "2G"}
+ ),
)
]
)
)
result = PodGenerator.construct_pod(
- dag_id='dag_id',
- task_id='task_id',
- pod_id='pod_id',
+ dag_id="dag_id",
+ task_id="task_id",
+ pod_id="pod_id",
try_number=3,
- kube_image='kube_image',
+ kube_image="kube_image",
date=self.execution_date,
- command=['command'],
+ command=["command"],
pod_override_object=executor_config,
base_worker_pod=worker_config,
- namespace='namespace',
- worker_uuid='uuid',
+ namespace="namespace",
+ worker_uuid="uuid",
)
sanitized_result = self.k8s_client.sanitize_for_serialization(result)
- self.metadata['annotations']['should'] = 'stay'
-
- self.assertEqual({
- 'apiVersion': 'v1',
- 'kind': 'Pod',
- 'metadata': self.metadata,
- 'spec': {
- 'containers': [{
- 'args': [],
- 'command': ['command'],
- 'env': [],
- 'envFrom': [],
- 'name': 'base',
- 'image': 'kube_image',
- 'ports': [],
- 'resources': {
- 'limits': {
- 'cpu': '2m',
- 'memory': '2G'
+ self.metadata["annotations"]["should"] = "stay"
+
+ self.assertEqual(
+ {
+ "apiVersion": "v1",
+ "kind": "Pod",
+ "metadata": self.metadata,
+ "spec": {
+ "containers": [
+ {
+ "args": [],
+ "command": ["command"],
+ "env": [],
+ "envFrom": [],
+ "name": "base",
+ "image": "kube_image",
+ "ports": [],
+ "resources": {"limits": {"cpu": "2m", "memory": "2G"}},
+ "volumeMounts": [],
+ "securityContext": {"runAsUser": 1},
}
- },
- 'volumeMounts': [],
- 'securityContext': {'runAsUser': 1}
- }],
- 'hostNetwork': False,
- 'imagePullSecrets': [],
- 'volumes': []
- }
- }, sanitized_result)
+ ],
+ "hostNetwork": False,
+ "imagePullSecrets": [],
+ "volumes": [],
+ },
+ },
+ sanitized_result,
+ )
def test_merge_objects_empty(self):
- annotations = {'foo1': 'bar1'}
+ annotations = {"foo1": "bar1"}
base_obj = k8s.V1ObjectMeta(annotations=annotations)
client_obj = None
res = merge_objects(base_obj, client_obj)
@@ -954,57 +986,54 @@ class TestPodGenerator(unittest.TestCase):
self.assertEqual(client_obj, res)
def test_merge_objects(self):
- base_annotations = {'foo1': 'bar1'}
- base_labels = {'foo1': 'bar1'}
- client_annotations = {'foo2': 'bar2'}
- base_obj = k8s.V1ObjectMeta(
- annotations=base_annotations,
- labels=base_labels
- )
+ base_annotations = {"foo1": "bar1"}
+ base_labels = {"foo1": "bar1"}
+ client_annotations = {"foo2": "bar2"}
+ base_obj = k8s.V1ObjectMeta(annotations=base_annotations, labels=base_labels)
client_obj = k8s.V1ObjectMeta(annotations=client_annotations)
res = merge_objects(base_obj, client_obj)
client_obj.labels = base_labels
self.assertEqual(client_obj, res)
def test_extend_object_field_empty(self):
- ports = [k8s.V1ContainerPort(container_port=1, name='port')]
- base_obj = k8s.V1Container(name='base_container', ports=ports)
- client_obj = k8s.V1Container(name='client_container')
- res = extend_object_field(base_obj, client_obj, 'ports')
+ ports = [k8s.V1ContainerPort(container_port=1, name="port")]
+ base_obj = k8s.V1Container(name="base_container", ports=ports)
+ client_obj = k8s.V1Container(name="client_container")
+ res = extend_object_field(base_obj, client_obj, "ports")
client_obj.ports = ports
self.assertEqual(client_obj, res)
- base_obj = k8s.V1Container(name='base_container')
- client_obj = k8s.V1Container(name='base_container', ports=ports)
- res = extend_object_field(base_obj, client_obj, 'ports')
+ base_obj = k8s.V1Container(name="base_container")
+ client_obj = k8s.V1Container(name="base_container", ports=ports)
+ res = extend_object_field(base_obj, client_obj, "ports")
self.assertEqual(client_obj, res)
def test_extend_object_field_not_list(self):
- base_obj = k8s.V1Container(name='base_container', image='image')
- client_obj = k8s.V1Container(name='client_container')
+ base_obj = k8s.V1Container(name="base_container", image="image")
+ client_obj = k8s.V1Container(name="client_container")
with self.assertRaises(ValueError):
- extend_object_field(base_obj, client_obj, 'image')
- base_obj = k8s.V1Container(name='base_container')
- client_obj = k8s.V1Container(name='client_container', image='image')
+ extend_object_field(base_obj, client_obj, "image")
+ base_obj = k8s.V1Container(name="base_container")
+ client_obj = k8s.V1Container(name="client_container", image="image")
with self.assertRaises(ValueError):
- extend_object_field(base_obj, client_obj, 'image')
+ extend_object_field(base_obj, client_obj, "image")
def test_extend_object_field(self):
- base_ports = [k8s.V1ContainerPort(container_port=1, name='base_port')]
- base_obj = k8s.V1Container(name='base_container', ports=base_ports)
- client_ports = [k8s.V1ContainerPort(container_port=1, name='client_port')]
- client_obj = k8s.V1Container(name='client_container', ports=client_ports)
- res = extend_object_field(base_obj, client_obj, 'ports')
+ base_ports = [k8s.V1ContainerPort(container_port=1, name="base_port")]
+ base_obj = k8s.V1Container(name="base_container", ports=base_ports)
+ client_ports = [k8s.V1ContainerPort(container_port=1, name="client_port")]
+ client_obj = k8s.V1Container(name="client_container", ports=client_ports)
+ res = extend_object_field(base_obj, client_obj, "ports")
client_obj.ports = base_ports + client_ports
self.assertEqual(client_obj, res)
def test_reconcile_containers_empty(self):
- base_objs = [k8s.V1Container(name='base_container')]
+ base_objs = [k8s.V1Container(name="base_container")]
client_objs = []
res = PodGenerator.reconcile_containers(base_objs, client_objs)
self.assertEqual(base_objs, res)
- client_objs = [k8s.V1Container(name='client_container')]
+ client_objs = [k8s.V1Container(name="client_container")]
base_objs = []
res = PodGenerator.reconcile_containers(base_objs, client_objs)
self.assertEqual(client_objs, res)
@@ -1013,33 +1042,33 @@ class TestPodGenerator(unittest.TestCase):
self.assertEqual(res, [])
def test_reconcile_containers(self):
- base_ports = [k8s.V1ContainerPort(container_port=1, name='base_port')]
+ base_ports = [k8s.V1ContainerPort(container_port=1, name="base_port")]
base_objs = [
- k8s.V1Container(name='base_container1', ports=base_ports),
- k8s.V1Container(name='base_container2', image='base_image'),
+ k8s.V1Container(name="base_container1", ports=base_ports),
+ k8s.V1Container(name="base_container2", image="base_image"),
]
- client_ports = [k8s.V1ContainerPort(container_port=2, name='client_port')]
+ client_ports = [k8s.V1ContainerPort(container_port=2, name="client_port")]
client_objs = [
- k8s.V1Container(name='client_container1', ports=client_ports),
- k8s.V1Container(name='client_container2', image='client_image'),
+ k8s.V1Container(name="client_container1", ports=client_ports),
+ k8s.V1Container(name="client_container2", image="client_image"),
]
res = PodGenerator.reconcile_containers(base_objs, client_objs)
client_objs[0].ports = base_ports + client_ports
self.assertEqual(client_objs, res)
- base_ports = [k8s.V1ContainerPort(container_port=1, name='base_port')]
+ base_ports = [k8s.V1ContainerPort(container_port=1, name="base_port")]
base_objs = [
- k8s.V1Container(name='base_container1', ports=base_ports),
- k8s.V1Container(name='base_container2', image='base_image'),
+ k8s.V1Container(name="base_container1", ports=base_ports),
+ k8s.V1Container(name="base_container2", image="base_image"),
]
- client_ports = [k8s.V1ContainerPort(container_port=2, name='client_port')]
+ client_ports = [k8s.V1ContainerPort(container_port=2, name="client_port")]
client_objs = [
- k8s.V1Container(name='client_container1', ports=client_ports),
- k8s.V1Container(name='client_container2', stdin=True),
+ k8s.V1Container(name="client_container1", ports=client_ports),
+ k8s.V1Container(name="client_container2", stdin=True),
]
res = PodGenerator.reconcile_containers(base_objs, client_objs)
client_objs[0].ports = base_ports + client_ports
- client_objs[1].image = 'base_image'
+ client_objs[1].image = "base_image"
self.assertEqual(client_objs, res)
def test_reconcile_specs_empty(self):
@@ -1054,17 +1083,23 @@ class TestPodGenerator(unittest.TestCase):
self.assertEqual(client_spec, res)
def test_reconcile_specs(self):
- base_objs = [k8s.V1Container(name='base_container1', image='base_image')]
- client_objs = [k8s.V1Container(name='client_container1')]
- base_spec = k8s.V1PodSpec(priority=1, active_deadline_seconds=100, containers=base_objs)
- client_spec = k8s.V1PodSpec(priority=2, hostname='local', containers=client_objs)
+ base_objs = [k8s.V1Container(name="base_container1", image="base_image")]
+ client_objs = [k8s.V1Container(name="client_container1")]
+ base_spec = k8s.V1PodSpec(
+ priority=1, active_deadline_seconds=100, containers=base_objs
+ )
+ client_spec = k8s.V1PodSpec(
+ priority=2, hostname="local", containers=client_objs
+ )
res = PodGenerator.reconcile_specs(base_spec, client_spec)
- client_spec.containers = [k8s.V1Container(name='client_container1', image='base_image')]
+ client_spec.containers = [
+ k8s.V1Container(name="client_container1", image="base_image")
+ ]
client_spec.active_deadline_seconds = 100
self.assertEqual(client_spec, res)
def test_deserialize_model_file(self):
- fixture = sys.path[0] + '/tests/kubernetes/pod.yaml'
+ fixture = sys.path[0] + "/tests/kubernetes/pod.yaml"
result = PodGenerator.deserialize_model_file(fixture)
sanitized_res = self.k8s_client.sanitize_for_serialization(result)
self.assertEqual(sanitized_res, self.deserialize_result)
@@ -1107,7 +1142,11 @@ spec:
command="test",
pod_override_object=None,
base_worker_pod=k8s.V1Pod(
- metadata=k8s.V1ObjectMeta(labels={"airflow-test": "airflow-task-pod"},
- annotations={"my.annotation": "foo"})))
+ metadata=k8s.V1ObjectMeta(
+ labels={"airflow-test": "airflow-task-pod"},
+ annotations={"my.annotation": "foo"},
+ )
+ ),
+ )
self.assertIn("airflow-test", pod.metadata.labels)
self.assertIn("my.annotation", pod.metadata.annotations)