You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/12/02 23:29:11 UTC
[airflow] 01/33: Fix issue with empty Resources in executor_config
(#12633)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit f3f9cd28d0a0aebc4ede947bb340186db0f6f844
Author: Daniel Imberman <da...@gmail.com>
AuthorDate: Fri Nov 27 05:52:49 2020 -0800
Fix issue with empty Resources in executor_config (#12633)
Fixes an issue where if a user specifies a request but not a limit in
resources for the executor_config, backwards compat can not parse
(cherry picked from commit 84eecf94bab1a8c66b5161f03c6631448fb4850e)
---
airflow/contrib/kubernetes/pod.py | 4 +-
tests/kubernetes/models/test_pod.py | 240 ++++++++++++++++++++++--------------
2 files changed, 152 insertions(+), 92 deletions(-)
diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py
index d1f30a8..7e38147 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -250,8 +250,8 @@ def _extract_ports(ports):
def _extract_resources(resources):
if isinstance(resources, k8s.V1ResourceRequirements):
- requests = resources.requests
- limits = resources.limits
+ requests = resources.requests or {}
+ limits = resources.limits or {}
return Resources(
request_memory=requests.get('memory', None),
request_cpu=requests.get('cpu', None),
diff --git a/tests/kubernetes/models/test_pod.py b/tests/kubernetes/models/test_pod.py
index f8df28a..6939597 100644
--- a/tests/kubernetes/models/test_pod.py
+++ b/tests/kubernetes/models/test_pod.py
@@ -19,63 +19,88 @@ from tests.compat import mock
from kubernetes.client import ApiClient
import kubernetes.client.models as k8s
from airflow.kubernetes.pod import Port
+from airflow.kubernetes.pod import Resources
+from airflow.contrib.kubernetes.pod import _extract_resources
from airflow.kubernetes.pod_generator import PodGenerator
from airflow.kubernetes.k8s_model import append_to_pod
class TestPod(unittest.TestCase):
+ def test_extract_resources(self):
+ res = _extract_resources(k8s.V1ResourceRequirements())
+ self.assertEqual(
+ res.to_k8s_client_obj().to_dict(), Resources().to_k8s_client_obj().to_dict()
+ )
+ res = _extract_resources(k8s.V1ResourceRequirements(limits={"memory": "1G"}))
+ self.assertEqual(
+ res.to_k8s_client_obj().to_dict(),
+ Resources(limit_memory="1G").to_k8s_client_obj().to_dict(),
+ )
+ res = _extract_resources(k8s.V1ResourceRequirements(requests={"memory": "1G"}))
+ self.assertEqual(
+ res.to_k8s_client_obj().to_dict(),
+ Resources(request_memory="1G").to_k8s_client_obj().to_dict(),
+ )
+ res = _extract_resources(
+ k8s.V1ResourceRequirements(
+ limits={"memory": "1G"}, requests={"memory": "1G"}
+ )
+ )
+ self.assertEqual(
+ res.to_k8s_client_obj().to_dict(),
+ Resources(limit_memory="1G", request_memory="1G")
+ .to_k8s_client_obj()
+ .to_dict(),
+ )
def test_port_to_k8s_client_obj(self):
- port = Port('http', 80)
+ port = Port("http", 80)
self.assertEqual(
port.to_k8s_client_obj(),
- k8s.V1ContainerPort(
- name='http',
- container_port=80
- )
+ k8s.V1ContainerPort(name="http", container_port=80),
)
- @mock.patch('uuid.uuid4')
+ @mock.patch("uuid.uuid4")
def test_port_attach_to_pod(self, mock_uuid):
import uuid
- static_uuid = uuid.UUID('cf4a56d2-8101-4217-b027-2af6216feb48')
+
+ static_uuid = uuid.UUID("cf4a56d2-8101-4217-b027-2af6216feb48")
mock_uuid.return_value = static_uuid
- pod = PodGenerator(image='airflow-worker:latest', name='base').gen_pod()
- ports = [
- Port('https', 443),
- Port('http', 80)
- ]
+ pod = PodGenerator(image="airflow-worker:latest", name="base").gen_pod()
+ ports = [Port("https", 443), Port("http", 80)]
k8s_client = ApiClient()
result = append_to_pod(pod, ports)
result = k8s_client.sanitize_for_serialization(result)
- self.assertEqual({
- 'apiVersion': 'v1',
- 'kind': 'Pod',
- 'metadata': {'name': 'base-' + static_uuid.hex},
- 'spec': {
- 'containers': [{
- 'args': [],
- 'command': [],
- 'env': [],
- 'envFrom': [],
- 'image': 'airflow-worker:latest',
- 'name': 'base',
- 'ports': [{
- 'name': 'https',
- 'containerPort': 443
- }, {
- 'name': 'http',
- 'containerPort': 80
- }],
- 'volumeMounts': [],
- }],
- 'hostNetwork': False,
- 'imagePullSecrets': [],
- 'volumes': []
- }
- }, result)
+ self.assertEqual(
+ {
+ "apiVersion": "v1",
+ "kind": "Pod",
+ "metadata": {"name": "base-" + static_uuid.hex},
+ "spec": {
+ "containers": [
+ {
+ "args": [],
+ "command": [],
+ "env": [],
+ "envFrom": [],
+ "image": "airflow-worker:latest",
+ "name": "base",
+ "ports": [
+ {"name": "https", "containerPort": 443},
+ {"name": "http", "containerPort": 80},
+ ],
+ "volumeMounts": [],
+ }
+ ],
+ "hostNetwork": False,
+ "imagePullSecrets": [],
+ "volumes": [],
+ },
+ },
+ result,
+ )
- @mock.patch('uuid.uuid4')
+ @mock.patch("uuid.uuid4")
def test_to_v1_pod(self, mock_uuid):
from airflow.contrib.kubernetes.pod import Pod as DeprecatedPod
from airflow.kubernetes.volume import Volume
@@ -83,7 +108,8 @@ class TestPod(unittest.TestCase):
from airflow.kubernetes.secret import Secret
from airflow.kubernetes.pod import Resources
import uuid
- static_uuid = uuid.UUID('cf4a56d2-8101-4217-b027-2af6216feb48')
+
+ static_uuid = uuid.UUID("cf4a56d2-8101-4217-b027-2af6216feb48")
mock_uuid.return_value = static_uuid
pod = DeprecatedPod(
@@ -94,24 +120,26 @@ class TestPod(unittest.TestCase):
envs={"test_key": "test_value"},
cmds=["airflow"],
resources=Resources(
- request_memory="1G",
- request_cpu="100Mi",
- limit_gpu="100G"
+ request_memory="1G", request_cpu="100Mi", limit_gpu="100G"
),
init_containers=k8s.V1Container(
name="test-container",
- volume_mounts=k8s.V1VolumeMount(mount_path="/foo/bar", name="init-volume-secret")
+ volume_mounts=k8s.V1VolumeMount(
+ mount_path="/foo/bar", name="init-volume-secret"
+ ),
),
volumes=[
Volume(name="foo", configs={}),
- {"name": "bar", 'secret': {'secretName': 'volume-secret'}}
+ {"name": "bar", "secret": {"secretName": "volume-secret"}},
],
secrets=[
Secret("volume", None, "init-volume-secret"),
- Secret('env', "AIRFLOW_SECRET", 'secret_name', "airflow_config"),
- Secret("volume", "/opt/airflow", "volume-secret", "secret-key")
+ Secret("env", "AIRFLOW_SECRET", "secret_name", "airflow_config"),
+ Secret("volume", "/opt/airflow", "volume-secret", "secret-key"),
+ ],
+ volume_mounts=[
+ VolumeMount(name="foo", mount_path="/mnt", sub_path="/", read_only=True)
],
- volume_mounts=[VolumeMount(name="foo", mount_path="/mnt", sub_path="/", read_only=True)]
)
k8s_client = ApiClient()
@@ -119,47 +147,79 @@ class TestPod(unittest.TestCase):
result = pod.to_v1_kubernetes_pod()
result = k8s_client.sanitize_for_serialization(result)
- expected = \
- {'metadata': {'annotations': {},
- 'labels': {},
- 'name': 'bar',
- 'namespace': 'baz'},
- 'spec': {'affinity': {},
- 'containers': [{'args': [],
- 'command': ['airflow'],
- 'env': [{'name': 'test_key', 'value': 'test_value'},
- {'name': 'AIRFLOW_SECRET',
- 'valueFrom': {'secretKeyRef': {'key': 'airflow_config',
- 'name': 'secret_name'}}}],
- 'envFrom': [],
- 'image': 'foo',
- 'imagePullPolicy': 'Never',
- 'name': 'base',
- 'resources': {'limits': {'nvidia.com/gpu': '100G'},
- 'requests': {'cpu': '100Mi',
- 'memory': '1G'}},
- 'volumeMounts': [{'mountPath': '/mnt',
- 'name': 'foo',
- 'readOnly': True,
- 'subPath': '/'},
- {'mountPath': '/opt/airflow',
- 'name': 'secretvol' + str(static_uuid),
- 'readOnly': True}]}],
- 'hostNetwork': False,
- 'imagePullSecrets': [],
- 'initContainers': {'name': 'test-container',
- 'volumeMounts': {'mountPath': '/foo/bar',
- 'name': 'init-volume-secret'}},
- 'nodeSelector': {},
- 'securityContext': {},
- 'tolerations': [],
- 'volumes': [{'name': 'foo'},
- {'name': 'bar',
- 'secret': {'secretName': 'volume-secret'}},
- {'name': 'secretvol' + str(static_uuid),
- 'secret': {'secretName': 'init-volume-secret'}},
- {'name': 'secretvol' + str(static_uuid),
- 'secret': {'secretName': 'volume-secret'}}
- ]}}
+ expected = {
+ "metadata": {
+ "annotations": {},
+ "labels": {},
+ "name": "bar",
+ "namespace": "baz",
+ },
+ "spec": {
+ "affinity": {},
+ "containers": [
+ {
+ "args": [],
+ "command": ["airflow"],
+ "env": [
+ {"name": "test_key", "value": "test_value"},
+ {
+ "name": "AIRFLOW_SECRET",
+ "valueFrom": {
+ "secretKeyRef": {
+ "key": "airflow_config",
+ "name": "secret_name",
+ }
+ },
+ },
+ ],
+ "envFrom": [],
+ "image": "foo",
+ "imagePullPolicy": "Never",
+ "name": "base",
+ "resources": {
+ "limits": {"nvidia.com/gpu": "100G"},
+ "requests": {"cpu": "100Mi", "memory": "1G"},
+ },
+ "volumeMounts": [
+ {
+ "mountPath": "/mnt",
+ "name": "foo",
+ "readOnly": True,
+ "subPath": "/",
+ },
+ {
+ "mountPath": "/opt/airflow",
+ "name": "secretvol" + str(static_uuid),
+ "readOnly": True,
+ },
+ ],
+ }
+ ],
+ "hostNetwork": False,
+ "imagePullSecrets": [],
+ "initContainers": {
+ "name": "test-container",
+ "volumeMounts": {
+ "mountPath": "/foo/bar",
+ "name": "init-volume-secret",
+ },
+ },
+ "nodeSelector": {},
+ "securityContext": {},
+ "tolerations": [],
+ "volumes": [
+ {"name": "foo"},
+ {"name": "bar", "secret": {"secretName": "volume-secret"}},
+ {
+ "name": "secretvol" + str(static_uuid),
+ "secret": {"secretName": "init-volume-secret"},
+ },
+ {
+ "name": "secretvol" + str(static_uuid),
+ "secret": {"secretName": "volume-secret"},
+ },
+ ],
+ },
+ }
self.maxDiff = None
self.assertEqual(expected, result)