You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/11 22:35:00 UTC

[airflow] 19/32: Fix bug in executor_config when defining resources (#9935)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 05ec21a22f84cdbe2aaed38b712c30f2cbb38b59
Author: Daniel Imberman <da...@gmail.com>
AuthorDate: Thu Jul 23 19:52:20 2020 -0700

    Fix bug in executor_config when defining resources (#9935)
    
    * Fix PodGenerator to handle Kubernetes resources
    
    In Airflow 1.10.11, `namespaced['resources'] = resources` is missing.
    This PR improves the definition of pod resources, `requests` and `limits` are optional.
    
    * Make it working in 2.7
    
    * Add limit_gpu and fix ephemeral-storage keys
    
    * Fix flake8
    
    Co-authored-by: Riccardo Bini <od...@gmail.com>
---
 airflow/kubernetes/pod.py              |   4 +-
 airflow/kubernetes/pod_generator.py    |  33 ++++++---
 tests/kubernetes/test_pod_generator.py | 132 +++++++++++++++++++++++++++++++++
 3 files changed, 155 insertions(+), 14 deletions(-)

diff --git a/airflow/kubernetes/pod.py b/airflow/kubernetes/pod.py
index b1df462..0b332c2 100644
--- a/airflow/kubernetes/pod.py
+++ b/airflow/kubernetes/pod.py
@@ -33,7 +33,7 @@ class Resources(K8SModel):
     :type request_memory: str
     :param request_cpu: requested CPU number
     :type request_cpu: float | str
-    :param request_ephemeral_storage: requested ephermeral storage
+    :param request_ephemeral_storage: requested ephemeral storage
     :type request_ephemeral_storage: str
     :param limit_memory: limit for memory usage
     :type limit_memory: str
@@ -41,7 +41,7 @@ class Resources(K8SModel):
     :type limit_cpu: float | str
     :param limit_gpu: Limits for GPU used
     :type limit_gpu: int
-    :param limit_ephemeral_storage: Limit for ephermeral storage
+    :param limit_ephemeral_storage: Limit for ephemeral storage
     :type limit_ephemeral_storage: float | str
     """
     def __init__(
diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index e46407b..d11c175 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -344,18 +344,26 @@ class PodGenerator(object):
         resources = namespaced.get('resources')
 
         if resources is None:
-            requests = {
-                'cpu': namespaced.get('request_cpu'),
-                'memory': namespaced.get('request_memory'),
-                'ephemeral-storage': namespaced.get('ephemeral-storage')
-            }
-            limits = {
-                'cpu': namespaced.get('limit_cpu'),
-                'memory': namespaced.get('limit_memory'),
-                'ephemeral-storage': namespaced.get('ephemeral-storage')
-            }
-            all_resources = list(requests.values()) + list(limits.values())
-            if all(r is None for r in all_resources):
+            def extract(cpu, memory, ephemeral_storage, limit_gpu=None):
+                resources_obj = {
+                    'cpu': namespaced.pop(cpu, None),
+                    'memory': namespaced.pop(memory, None),
+                    'ephemeral-storage': namespaced.pop(ephemeral_storage, None),
+                }
+                if limit_gpu is not None:
+                    resources_obj['nvidia.com/gpu'] = namespaced.pop(limit_gpu, None)
+
+                resources_obj = {k: v for k, v in resources_obj.items() if v is not None}
+
+                if all(r is None for r in resources_obj):
+                    resources_obj = None
+                return namespaced, resources_obj
+
+            namespaced, requests = extract('request_cpu', 'request_memory', 'request_ephemeral_storage')
+            namespaced, limits = extract('limit_cpu', 'limit_memory', 'limit_ephemeral_storage',
+                                         limit_gpu='limit_gpu')
+
+            if requests is None and limits is None:
                 resources = None
             else:
                 resources = k8s.V1ResourceRequirements(
@@ -371,6 +379,7 @@ class PodGenerator(object):
                 'iam.cloud.google.com/service-account': gcp_service_account_key
             })
 
+        namespaced['resources'] = resources
         return PodGenerator(**namespaced).gen_pod()
 
     @staticmethod
diff --git a/tests/kubernetes/test_pod_generator.py b/tests/kubernetes/test_pod_generator.py
index 7d39cdc..d0faf4c 100644
--- a/tests/kubernetes/test_pod_generator.py
+++ b/tests/kubernetes/test_pod_generator.py
@@ -288,6 +288,138 @@ class TestPodGenerator(unittest.TestCase):
         }, result)
 
     @mock.patch('uuid.uuid4')
+    def test_from_obj_with_resources(self, mock_uuid):
+        self.maxDiff = None
+
+        mock_uuid.return_value = self.static_uuid
+        result = PodGenerator.from_obj({
+            "KubernetesExecutor": {
+                "annotations": {"test": "annotation"},
+                "volumes": [
+                    {
+                        "name": "example-kubernetes-test-volume",
+                        "hostPath": {"path": "/tmp/"},
+                    },
+                ],
+                "volume_mounts": [
+                    {
+                        "mountPath": "/foo/",
+                        "name": "example-kubernetes-test-volume",
+                    },
+                ],
+                'request_cpu': "200m",
+                'limit_cpu': "400m",
+                'request_memory': "500Mi",
+                'limit_memory': "1000Mi",
+                'limit_gpu': "2",
+                'request_ephemeral_storage': '2Gi',
+                'limit_ephemeral_storage': '4Gi',
+            }
+        })
+        result = self.k8s_client.sanitize_for_serialization(result)
+
+        self.assertEqual({
+            'apiVersion': 'v1',
+            'kind': 'Pod',
+            'metadata': {
+                'annotations': {'test': 'annotation'},
+            },
+            'spec': {
+                'containers': [{
+                    'args': [],
+                    'command': [],
+                    'env': [],
+                    'envFrom': [],
+                    'name': 'base',
+                    'ports': [],
+                    'resources': {
+                        'limits': {
+                            'cpu': '400m',
+                            'ephemeral-storage': '4Gi',
+                            'memory': '1000Mi',
+                            'nvidia.com/gpu': "2",
+                        },
+                        'requests': {
+                            'cpu': '200m',
+                            'ephemeral-storage': '2Gi',
+                            'memory': '500Mi',
+                        },
+                    },
+                    'volumeMounts': [{
+                        'mountPath': '/foo/',
+                        'name': 'example-kubernetes-test-volume'
+                    }],
+                }],
+                'hostNetwork': False,
+                'imagePullSecrets': [],
+                'volumes': [{
+                    'hostPath': {'path': '/tmp/'},
+                    'name': 'example-kubernetes-test-volume'
+                }],
+            }
+        }, result)
+
+    @mock.patch('uuid.uuid4')
+    def test_from_obj_with_only_request_resources(self, mock_uuid):
+        self.maxDiff = None
+
+        mock_uuid.return_value = self.static_uuid
+        result = PodGenerator.from_obj({
+            "KubernetesExecutor": {
+                "annotations": {"test": "annotation"},
+                "volumes": [
+                    {
+                        "name": "example-kubernetes-test-volume",
+                        "hostPath": {"path": "/tmp/"},
+                    },
+                ],
+                "volume_mounts": [
+                    {
+                        "mountPath": "/foo/",
+                        "name": "example-kubernetes-test-volume",
+                    },
+                ],
+                'request_cpu': "200m",
+                'request_memory': "500Mi",
+            }
+        })
+        result = self.k8s_client.sanitize_for_serialization(result)
+
+        self.assertEqual({
+            'apiVersion': 'v1',
+            'kind': 'Pod',
+            'metadata': {
+                'annotations': {'test': 'annotation'},
+            },
+            'spec': {
+                'containers': [{
+                    'args': [],
+                    'command': [],
+                    'env': [],
+                    'envFrom': [],
+                    'name': 'base',
+                    'ports': [],
+                    'resources': {
+                        'requests': {
+                            'cpu': '200m',
+                            'memory': '500Mi',
+                        },
+                    },
+                    'volumeMounts': [{
+                        'mountPath': '/foo/',
+                        'name': 'example-kubernetes-test-volume'
+                    }],
+                }],
+                'hostNetwork': False,
+                'imagePullSecrets': [],
+                'volumes': [{
+                    'hostPath': {'path': '/tmp/'},
+                    'name': 'example-kubernetes-test-volume'
+                }],
+            }
+        }, result)
+
+    @mock.patch('uuid.uuid4')
     def test_reconcile_pods_empty_mutator_pod(self, mock_uuid):
         mock_uuid.return_value = self.static_uuid
         base_pod = PodGenerator(