You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by di...@apache.org on 2020/06/25 22:07:32 UTC

[airflow] branch v1-10-test updated: [AIRFLOW-5659] Add support for ephemeral storage on KubernetesPodOperator (#6337)

This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-test by this push:
     new 5e238f4  [AIRFLOW-5659] Add support for ephemeral storage on KubernetesPodOperator (#6337)
5e238f4 is described below

commit 5e238f4b3da793a1211c8d74cb4f4032e4b926f8
Author: Leonardo Alves Miguel <le...@gmail.com>
AuthorDate: Thu Feb 27 07:25:24 2020 -0300

    [AIRFLOW-5659] Add support for ephemeral storage on KubernetesPodOperator (#6337)
    
    (cherry picked from commit dfb18adaf53fa12f1b10b1283321b1dd71211059)
---
 airflow/kubernetes/pod.py                        | 31 ++++++++++++++++++++----
 airflow/kubernetes/pod_generator.py              |  6 +++--
 kubernetes_tests/test_kubernetes_pod_operator.py |  4 +++
 tests/kubernetes/test_pod_generator.py           |  9 ++++---
 4 files changed, 40 insertions(+), 10 deletions(-)

diff --git a/airflow/kubernetes/pod.py b/airflow/kubernetes/pod.py
index 6a0e788..b1df462 100644
--- a/airflow/kubernetes/pod.py
+++ b/airflow/kubernetes/pod.py
@@ -33,25 +33,33 @@ class Resources(K8SModel):
     :type request_memory: str
     :param request_cpu: requested CPU number
     :type request_cpu: float | str
+    :param request_ephemeral_storage: requested ephermeral storage
+    :type request_ephemeral_storage: str
     :param limit_memory: limit for memory usage
     :type limit_memory: str
     :param limit_cpu: Limit for CPU used
     :type limit_cpu: float | str
     :param limit_gpu: Limits for GPU used
     :type limit_gpu: int
+    :param limit_ephemeral_storage: Limit for ephermeral storage
+    :type limit_ephemeral_storage: float | str
     """
     def __init__(
             self,
             request_memory=None,
             request_cpu=None,
+            request_ephemeral_storage=None,
             limit_memory=None,
             limit_cpu=None,
-            limit_gpu=None):
+            limit_gpu=None,
+            limit_ephemeral_storage=None):
         self.request_memory = request_memory
         self.request_cpu = request_cpu
+        self.request_ephemeral_storage = request_ephemeral_storage
         self.limit_memory = limit_memory
         self.limit_cpu = limit_cpu
         self.limit_gpu = limit_gpu
+        self.limit_ephemeral_storage = limit_ephemeral_storage
 
     def is_empty_resource_request(self):
         """Whether resource is empty"""
@@ -59,16 +67,29 @@ class Resources(K8SModel):
 
     def has_limits(self):
         """Whether resource has limits"""
-        return self.limit_cpu is not None or self.limit_memory is not None or self.limit_gpu is not None
+        return self.limit_cpu is not None or \
+            self.limit_memory is not None or \
+            self.limit_gpu is not None or \
+            self.limit_ephemeral_storage is not None
 
     def has_requests(self):
         """Whether resource has requests"""
-        return self.request_cpu is not None or self.request_memory is not None
+        return self.request_cpu is not None or \
+            self.request_memory is not None or \
+            self.request_ephemeral_storage is not None
 
     def to_k8s_client_obj(self):
         return k8s.V1ResourceRequirements(
-            limits={'cpu': self.limit_cpu, 'memory': self.limit_memory, 'nvidia.com/gpu': self.limit_gpu},
-            requests={'cpu': self.request_cpu, 'memory': self.request_memory}
+            limits={
+                'cpu': self.limit_cpu,
+                'memory': self.limit_memory,
+                'nvidia.com/gpu': self.limit_gpu,
+                'ephemeral-storage': self.limit_ephemeral_storage
+            },
+            requests={
+                'cpu': self.request_cpu,
+                'memory': self.request_memory,
+                'ephemeral-storage': self.request_ephemeral_storage}
         )
 
     def attach_to_pod(self, pod):
diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index 6956fc3..711b1a9 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -323,11 +323,13 @@ class PodGenerator:
         if resources is None:
             requests = {
                 'cpu': namespaced.get('request_cpu'),
-                'memory': namespaced.get('request_memory')
+                'memory': namespaced.get('request_memory'),
+                'ephemeral-storage': namespaced.get('ephemeral-storage')
             }
             limits = {
                 'cpu': namespaced.get('limit_cpu'),
-                'memory': namespaced.get('limit_memory')
+                'memory': namespaced.get('limit_memory'),
+                'ephemeral-storage': namespaced.get('ephemeral-storage')
             }
             all_resources = list(requests.values()) + list(limits.values())
             if all(r is None for r in all_resources):
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py
index 0a129b6..e20324b 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -375,8 +375,10 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
         resources = {
             'limit_cpu': 0.25,
             'limit_memory': '64Mi',
+            'limit_ephemeral_storage': '2Gi',
             'request_cpu': '250m',
             'request_memory': '64Mi',
+            'request_ephemeral_storage': '1Gi',
         }
         k = KubernetesPodOperator(
             namespace='default',
@@ -397,11 +399,13 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             'requests': {
                 'memory': '64Mi',
                 'cpu': '250m',
+                'ephemeral-storage': '1Gi'
             },
             'limits': {
                 'memory': '64Mi',
                 'cpu': 0.25,
                 'nvidia.com/gpu': None,
+                'ephemeral-storage': '2Gi'
             }
         }
         self.assertEqual(self.expected_pod, actual_pod)
diff --git a/tests/kubernetes/test_pod_generator.py b/tests/kubernetes/test_pod_generator.py
index a9a3aa5..ce15a8b 100644
--- a/tests/kubernetes/test_pod_generator.py
+++ b/tests/kubernetes/test_pod_generator.py
@@ -43,6 +43,7 @@ class TestPodGenerator(unittest.TestCase):
             # This should produce a single secret mounted in env
             Secret('env', 'TARGET', 'secret_b', 'source_b'),
         ]
+
         self.labels = {
             'airflow-worker': 'uuid',
             'dag_id': 'dag_id',
@@ -58,7 +59,7 @@ class TestPodGenerator(unittest.TestCase):
             'namespace': 'namespace'
         }
 
-        self.resources = Resources('1Gi', 1, '2Gi', 2, 1)
+        self.resources = Resources('1Gi', 1, '2Gi', '2Gi', 2, 1, '4Gi')
         self.k8s_client = ApiClient()
         self.expected = {
             'apiVersion': 'v1',
@@ -108,12 +109,14 @@ class TestPodGenerator(unittest.TestCase):
                     'resources': {
                         'requests': {
                             'memory': '1Gi',
-                            'cpu': 1
+                            'cpu': 1,
+                            'ephemeral-storage': '2Gi'
                         },
                         'limits': {
                             'memory': '2Gi',
                             'cpu': 2,
-                            'nvidia.com/gpu': 1
+                            'nvidia.com/gpu': 1,
+                            'ephemeral-storage': '4Gi'
                         },
                     },
                     'ports': [{'name': 'foo', 'containerPort': 1234}],