You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by di...@apache.org on 2020/06/25 22:07:32 UTC
[airflow] branch v1-10-test updated: [AIRFLOW-5659] Add support for
ephemeral storage on KubernetesPodOperator (#6337)
This is an automated email from the ASF dual-hosted git repository.
dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v1-10-test by this push:
new 5e238f4 [AIRFLOW-5659] Add support for ephemeral storage on KubernetesPodOperator (#6337)
5e238f4 is described below
commit 5e238f4b3da793a1211c8d74cb4f4032e4b926f8
Author: Leonardo Alves Miguel <le...@gmail.com>
AuthorDate: Thu Feb 27 07:25:24 2020 -0300
[AIRFLOW-5659] Add support for ephemeral storage on KubernetesPodOperator (#6337)
(cherry picked from commit dfb18adaf53fa12f1b10b1283321b1dd71211059)
---
airflow/kubernetes/pod.py | 31 ++++++++++++++++++++----
airflow/kubernetes/pod_generator.py | 6 +++--
kubernetes_tests/test_kubernetes_pod_operator.py | 4 +++
tests/kubernetes/test_pod_generator.py | 9 ++++---
4 files changed, 40 insertions(+), 10 deletions(-)
diff --git a/airflow/kubernetes/pod.py b/airflow/kubernetes/pod.py
index 6a0e788..b1df462 100644
--- a/airflow/kubernetes/pod.py
+++ b/airflow/kubernetes/pod.py
@@ -33,25 +33,33 @@ class Resources(K8SModel):
:type request_memory: str
:param request_cpu: requested CPU number
:type request_cpu: float | str
+ :param request_ephemeral_storage: requested ephermeral storage
+ :type request_ephemeral_storage: str
:param limit_memory: limit for memory usage
:type limit_memory: str
:param limit_cpu: Limit for CPU used
:type limit_cpu: float | str
:param limit_gpu: Limits for GPU used
:type limit_gpu: int
+ :param limit_ephemeral_storage: Limit for ephermeral storage
+ :type limit_ephemeral_storage: float | str
"""
def __init__(
self,
request_memory=None,
request_cpu=None,
+ request_ephemeral_storage=None,
limit_memory=None,
limit_cpu=None,
- limit_gpu=None):
+ limit_gpu=None,
+ limit_ephemeral_storage=None):
self.request_memory = request_memory
self.request_cpu = request_cpu
+ self.request_ephemeral_storage = request_ephemeral_storage
self.limit_memory = limit_memory
self.limit_cpu = limit_cpu
self.limit_gpu = limit_gpu
+ self.limit_ephemeral_storage = limit_ephemeral_storage
def is_empty_resource_request(self):
"""Whether resource is empty"""
@@ -59,16 +67,29 @@ class Resources(K8SModel):
def has_limits(self):
"""Whether resource has limits"""
- return self.limit_cpu is not None or self.limit_memory is not None or self.limit_gpu is not None
+ return self.limit_cpu is not None or \
+ self.limit_memory is not None or \
+ self.limit_gpu is not None or \
+ self.limit_ephemeral_storage is not None
def has_requests(self):
"""Whether resource has requests"""
- return self.request_cpu is not None or self.request_memory is not None
+ return self.request_cpu is not None or \
+ self.request_memory is not None or \
+ self.request_ephemeral_storage is not None
def to_k8s_client_obj(self):
return k8s.V1ResourceRequirements(
- limits={'cpu': self.limit_cpu, 'memory': self.limit_memory, 'nvidia.com/gpu': self.limit_gpu},
- requests={'cpu': self.request_cpu, 'memory': self.request_memory}
+ limits={
+ 'cpu': self.limit_cpu,
+ 'memory': self.limit_memory,
+ 'nvidia.com/gpu': self.limit_gpu,
+ 'ephemeral-storage': self.limit_ephemeral_storage
+ },
+ requests={
+ 'cpu': self.request_cpu,
+ 'memory': self.request_memory,
+ 'ephemeral-storage': self.request_ephemeral_storage}
)
def attach_to_pod(self, pod):
diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index 6956fc3..711b1a9 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -323,11 +323,13 @@ class PodGenerator:
if resources is None:
requests = {
'cpu': namespaced.get('request_cpu'),
- 'memory': namespaced.get('request_memory')
+ 'memory': namespaced.get('request_memory'),
+ 'ephemeral-storage': namespaced.get('ephemeral-storage')
}
limits = {
'cpu': namespaced.get('limit_cpu'),
- 'memory': namespaced.get('limit_memory')
+ 'memory': namespaced.get('limit_memory'),
+ 'ephemeral-storage': namespaced.get('ephemeral-storage')
}
all_resources = list(requests.values()) + list(limits.values())
if all(r is None for r in all_resources):
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py
index 0a129b6..e20324b 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -375,8 +375,10 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
resources = {
'limit_cpu': 0.25,
'limit_memory': '64Mi',
+ 'limit_ephemeral_storage': '2Gi',
'request_cpu': '250m',
'request_memory': '64Mi',
+ 'request_ephemeral_storage': '1Gi',
}
k = KubernetesPodOperator(
namespace='default',
@@ -397,11 +399,13 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
'requests': {
'memory': '64Mi',
'cpu': '250m',
+ 'ephemeral-storage': '1Gi'
},
'limits': {
'memory': '64Mi',
'cpu': 0.25,
'nvidia.com/gpu': None,
+ 'ephemeral-storage': '2Gi'
}
}
self.assertEqual(self.expected_pod, actual_pod)
diff --git a/tests/kubernetes/test_pod_generator.py b/tests/kubernetes/test_pod_generator.py
index a9a3aa5..ce15a8b 100644
--- a/tests/kubernetes/test_pod_generator.py
+++ b/tests/kubernetes/test_pod_generator.py
@@ -43,6 +43,7 @@ class TestPodGenerator(unittest.TestCase):
# This should produce a single secret mounted in env
Secret('env', 'TARGET', 'secret_b', 'source_b'),
]
+
self.labels = {
'airflow-worker': 'uuid',
'dag_id': 'dag_id',
@@ -58,7 +59,7 @@ class TestPodGenerator(unittest.TestCase):
'namespace': 'namespace'
}
- self.resources = Resources('1Gi', 1, '2Gi', 2, 1)
+ self.resources = Resources('1Gi', 1, '2Gi', '2Gi', 2, 1, '4Gi')
self.k8s_client = ApiClient()
self.expected = {
'apiVersion': 'v1',
@@ -108,12 +109,14 @@ class TestPodGenerator(unittest.TestCase):
'resources': {
'requests': {
'memory': '1Gi',
- 'cpu': 1
+ 'cpu': 1,
+ 'ephemeral-storage': '2Gi'
},
'limits': {
'memory': '2Gi',
'cpu': 2,
- 'nvidia.com/gpu': 1
+ 'nvidia.com/gpu': 1,
+ 'ephemeral-storage': '4Gi'
},
},
'ports': [{'name': 'foo', 'containerPort': 1234}],