You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/11/27 13:53:59 UTC

[airflow] branch v1-10-test updated: Fix issue with empty Resources in executor_config (#12633)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-test by this push:
     new 74e0247  Fix issue with empty Resources in executor_config (#12633)
74e0247 is described below

commit 74e02477bdd83880c2bc2b57ded36ef730dcacbb
Author: Daniel Imberman <da...@gmail.com>
AuthorDate: Fri Nov 27 05:52:49 2020 -0800

    Fix issue with empty Resources in executor_config (#12633)
    
    Fixes an issue where if a user specifies a request but not a limit in
    resources for the executor_config, backwards compat can not parse
    
    (cherry picked from commit 84eecf94bab1a8c66b5161f03c6631448fb4850e)
---
 airflow/contrib/kubernetes/pod.py   |   4 +-
 tests/kubernetes/models/test_pod.py | 240 ++++++++++++++++++++++--------------
 2 files changed, 152 insertions(+), 92 deletions(-)

diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py
index d1f30a8..7e38147 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -250,8 +250,8 @@ def _extract_ports(ports):
 
 def _extract_resources(resources):
     if isinstance(resources, k8s.V1ResourceRequirements):
-        requests = resources.requests
-        limits = resources.limits
+        requests = resources.requests or {}
+        limits = resources.limits or {}
         return Resources(
             request_memory=requests.get('memory', None),
             request_cpu=requests.get('cpu', None),
diff --git a/tests/kubernetes/models/test_pod.py b/tests/kubernetes/models/test_pod.py
index f8df28a..6939597 100644
--- a/tests/kubernetes/models/test_pod.py
+++ b/tests/kubernetes/models/test_pod.py
@@ -19,63 +19,88 @@ from tests.compat import mock
 from kubernetes.client import ApiClient
 import kubernetes.client.models as k8s
 from airflow.kubernetes.pod import Port
+from airflow.kubernetes.pod import Resources
+from airflow.contrib.kubernetes.pod import _extract_resources
 from airflow.kubernetes.pod_generator import PodGenerator
 from airflow.kubernetes.k8s_model import append_to_pod
 
 
 class TestPod(unittest.TestCase):
+    def test_extract_resources(self):
+        res = _extract_resources(k8s.V1ResourceRequirements())
+        self.assertEqual(
+            res.to_k8s_client_obj().to_dict(), Resources().to_k8s_client_obj().to_dict()
+        )
+        res = _extract_resources(k8s.V1ResourceRequirements(limits={"memory": "1G"}))
+        self.assertEqual(
+            res.to_k8s_client_obj().to_dict(),
+            Resources(limit_memory="1G").to_k8s_client_obj().to_dict(),
+        )
+        res = _extract_resources(k8s.V1ResourceRequirements(requests={"memory": "1G"}))
+        self.assertEqual(
+            res.to_k8s_client_obj().to_dict(),
+            Resources(request_memory="1G").to_k8s_client_obj().to_dict(),
+        )
+        res = _extract_resources(
+            k8s.V1ResourceRequirements(
+                limits={"memory": "1G"}, requests={"memory": "1G"}
+            )
+        )
+        self.assertEqual(
+            res.to_k8s_client_obj().to_dict(),
+            Resources(limit_memory="1G", request_memory="1G")
+            .to_k8s_client_obj()
+            .to_dict(),
+        )
 
     def test_port_to_k8s_client_obj(self):
-        port = Port('http', 80)
+        port = Port("http", 80)
         self.assertEqual(
             port.to_k8s_client_obj(),
-            k8s.V1ContainerPort(
-                name='http',
-                container_port=80
-            )
+            k8s.V1ContainerPort(name="http", container_port=80),
         )
 
-    @mock.patch('uuid.uuid4')
+    @mock.patch("uuid.uuid4")
     def test_port_attach_to_pod(self, mock_uuid):
         import uuid
-        static_uuid = uuid.UUID('cf4a56d2-8101-4217-b027-2af6216feb48')
+
+        static_uuid = uuid.UUID("cf4a56d2-8101-4217-b027-2af6216feb48")
         mock_uuid.return_value = static_uuid
-        pod = PodGenerator(image='airflow-worker:latest', name='base').gen_pod()
-        ports = [
-            Port('https', 443),
-            Port('http', 80)
-        ]
+        pod = PodGenerator(image="airflow-worker:latest", name="base").gen_pod()
+        ports = [Port("https", 443), Port("http", 80)]
         k8s_client = ApiClient()
         result = append_to_pod(pod, ports)
         result = k8s_client.sanitize_for_serialization(result)
-        self.assertEqual({
-            'apiVersion': 'v1',
-            'kind': 'Pod',
-            'metadata': {'name': 'base-' + static_uuid.hex},
-            'spec': {
-                'containers': [{
-                    'args': [],
-                    'command': [],
-                    'env': [],
-                    'envFrom': [],
-                    'image': 'airflow-worker:latest',
-                    'name': 'base',
-                    'ports': [{
-                        'name': 'https',
-                        'containerPort': 443
-                    }, {
-                        'name': 'http',
-                        'containerPort': 80
-                    }],
-                    'volumeMounts': [],
-                }],
-                'hostNetwork': False,
-                'imagePullSecrets': [],
-                'volumes': []
-            }
-        }, result)
+        self.assertEqual(
+            {
+                "apiVersion": "v1",
+                "kind": "Pod",
+                "metadata": {"name": "base-" + static_uuid.hex},
+                "spec": {
+                    "containers": [
+                        {
+                            "args": [],
+                            "command": [],
+                            "env": [],
+                            "envFrom": [],
+                            "image": "airflow-worker:latest",
+                            "name": "base",
+                            "ports": [
+                                {"name": "https", "containerPort": 443},
+                                {"name": "http", "containerPort": 80},
+                            ],
+                            "volumeMounts": [],
+                        }
+                    ],
+                    "hostNetwork": False,
+                    "imagePullSecrets": [],
+                    "volumes": [],
+                },
+            },
+            result,
+        )
 
-    @mock.patch('uuid.uuid4')
+    @mock.patch("uuid.uuid4")
     def test_to_v1_pod(self, mock_uuid):
         from airflow.contrib.kubernetes.pod import Pod as DeprecatedPod
         from airflow.kubernetes.volume import Volume
@@ -83,7 +108,8 @@ class TestPod(unittest.TestCase):
         from airflow.kubernetes.secret import Secret
         from airflow.kubernetes.pod import Resources
         import uuid
-        static_uuid = uuid.UUID('cf4a56d2-8101-4217-b027-2af6216feb48')
+
+        static_uuid = uuid.UUID("cf4a56d2-8101-4217-b027-2af6216feb48")
         mock_uuid.return_value = static_uuid
 
         pod = DeprecatedPod(
@@ -94,24 +120,26 @@ class TestPod(unittest.TestCase):
             envs={"test_key": "test_value"},
             cmds=["airflow"],
             resources=Resources(
-                request_memory="1G",
-                request_cpu="100Mi",
-                limit_gpu="100G"
+                request_memory="1G", request_cpu="100Mi", limit_gpu="100G"
             ),
             init_containers=k8s.V1Container(
                 name="test-container",
-                volume_mounts=k8s.V1VolumeMount(mount_path="/foo/bar", name="init-volume-secret")
+                volume_mounts=k8s.V1VolumeMount(
+                    mount_path="/foo/bar", name="init-volume-secret"
+                ),
             ),
             volumes=[
                 Volume(name="foo", configs={}),
-                {"name": "bar", 'secret': {'secretName': 'volume-secret'}}
+                {"name": "bar", "secret": {"secretName": "volume-secret"}},
             ],
             secrets=[
                 Secret("volume", None, "init-volume-secret"),
-                Secret('env', "AIRFLOW_SECRET", 'secret_name', "airflow_config"),
-                Secret("volume", "/opt/airflow", "volume-secret", "secret-key")
+                Secret("env", "AIRFLOW_SECRET", "secret_name", "airflow_config"),
+                Secret("volume", "/opt/airflow", "volume-secret", "secret-key"),
+            ],
+            volume_mounts=[
+                VolumeMount(name="foo", mount_path="/mnt", sub_path="/", read_only=True)
             ],
-            volume_mounts=[VolumeMount(name="foo", mount_path="/mnt", sub_path="/", read_only=True)]
         )
 
         k8s_client = ApiClient()
@@ -119,47 +147,79 @@ class TestPod(unittest.TestCase):
         result = pod.to_v1_kubernetes_pod()
         result = k8s_client.sanitize_for_serialization(result)
 
-        expected = \
-            {'metadata': {'annotations': {},
-                          'labels': {},
-                          'name': 'bar',
-                          'namespace': 'baz'},
-             'spec': {'affinity': {},
-                      'containers': [{'args': [],
-                                      'command': ['airflow'],
-                                      'env': [{'name': 'test_key', 'value': 'test_value'},
-                                              {'name': 'AIRFLOW_SECRET',
-                                               'valueFrom': {'secretKeyRef': {'key': 'airflow_config',
-                                                                              'name': 'secret_name'}}}],
-                                      'envFrom': [],
-                                      'image': 'foo',
-                                      'imagePullPolicy': 'Never',
-                                      'name': 'base',
-                                      'resources': {'limits': {'nvidia.com/gpu': '100G'},
-                                                    'requests': {'cpu': '100Mi',
-                                                                 'memory': '1G'}},
-                                      'volumeMounts': [{'mountPath': '/mnt',
-                                                        'name': 'foo',
-                                                        'readOnly': True,
-                                                        'subPath': '/'},
-                                                       {'mountPath': '/opt/airflow',
-                                                        'name': 'secretvol' + str(static_uuid),
-                                                        'readOnly': True}]}],
-                      'hostNetwork': False,
-                      'imagePullSecrets': [],
-                      'initContainers': {'name': 'test-container',
-                                         'volumeMounts': {'mountPath': '/foo/bar',
-                                                          'name': 'init-volume-secret'}},
-                      'nodeSelector': {},
-                      'securityContext': {},
-                      'tolerations': [],
-                      'volumes': [{'name': 'foo'},
-                                  {'name': 'bar',
-                                   'secret': {'secretName': 'volume-secret'}},
-                                  {'name': 'secretvol' + str(static_uuid),
-                                   'secret': {'secretName': 'init-volume-secret'}},
-                                  {'name': 'secretvol' + str(static_uuid),
-                                   'secret': {'secretName': 'volume-secret'}}
-                                  ]}}
+        expected = {
+            "metadata": {
+                "annotations": {},
+                "labels": {},
+                "name": "bar",
+                "namespace": "baz",
+            },
+            "spec": {
+                "affinity": {},
+                "containers": [
+                    {
+                        "args": [],
+                        "command": ["airflow"],
+                        "env": [
+                            {"name": "test_key", "value": "test_value"},
+                            {
+                                "name": "AIRFLOW_SECRET",
+                                "valueFrom": {
+                                    "secretKeyRef": {
+                                        "key": "airflow_config",
+                                        "name": "secret_name",
+                                    }
+                                },
+                            },
+                        ],
+                        "envFrom": [],
+                        "image": "foo",
+                        "imagePullPolicy": "Never",
+                        "name": "base",
+                        "resources": {
+                            "limits": {"nvidia.com/gpu": "100G"},
+                            "requests": {"cpu": "100Mi", "memory": "1G"},
+                        },
+                        "volumeMounts": [
+                            {
+                                "mountPath": "/mnt",
+                                "name": "foo",
+                                "readOnly": True,
+                                "subPath": "/",
+                            },
+                            {
+                                "mountPath": "/opt/airflow",
+                                "name": "secretvol" + str(static_uuid),
+                                "readOnly": True,
+                            },
+                        ],
+                    }
+                ],
+                "hostNetwork": False,
+                "imagePullSecrets": [],
+                "initContainers": {
+                    "name": "test-container",
+                    "volumeMounts": {
+                        "mountPath": "/foo/bar",
+                        "name": "init-volume-secret",
+                    },
+                },
+                "nodeSelector": {},
+                "securityContext": {},
+                "tolerations": [],
+                "volumes": [
+                    {"name": "foo"},
+                    {"name": "bar", "secret": {"secretName": "volume-secret"}},
+                    {
+                        "name": "secretvol" + str(static_uuid),
+                        "secret": {"secretName": "init-volume-secret"},
+                    },
+                    {
+                        "name": "secretvol" + str(static_uuid),
+                        "secret": {"secretName": "volume-secret"},
+                    },
+                ],
+            },
+        }
         self.maxDiff = None
         self.assertEqual(expected, result)