You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/11 22:35:00 UTC
[airflow] 19/32: Fix bug in executor_config when defining resources
(#9935)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 05ec21a22f84cdbe2aaed38b712c30f2cbb38b59
Author: Daniel Imberman <da...@gmail.com>
AuthorDate: Thu Jul 23 19:52:20 2020 -0700
Fix bug in executor_config when defining resources (#9935)
* Fix PodGenerator to handle Kubernetes resources
In Airflow 1.10.11, `namespaced['resources'] = resources` is missing.
This PR improves the definition of pod resources, `requests` and `limits` are optional.
* Make it working in 2.7
* Add limit_gpu and fix ephemeral-storage keys
* Fix flake8
Co-authored-by: Riccardo Bini <od...@gmail.com>
---
airflow/kubernetes/pod.py | 4 +-
airflow/kubernetes/pod_generator.py | 33 ++++++---
tests/kubernetes/test_pod_generator.py | 132 +++++++++++++++++++++++++++++++++
3 files changed, 155 insertions(+), 14 deletions(-)
diff --git a/airflow/kubernetes/pod.py b/airflow/kubernetes/pod.py
index b1df462..0b332c2 100644
--- a/airflow/kubernetes/pod.py
+++ b/airflow/kubernetes/pod.py
@@ -33,7 +33,7 @@ class Resources(K8SModel):
:type request_memory: str
:param request_cpu: requested CPU number
:type request_cpu: float | str
- :param request_ephemeral_storage: requested ephermeral storage
+ :param request_ephemeral_storage: requested ephemeral storage
:type request_ephemeral_storage: str
:param limit_memory: limit for memory usage
:type limit_memory: str
@@ -41,7 +41,7 @@ class Resources(K8SModel):
:type limit_cpu: float | str
:param limit_gpu: Limits for GPU used
:type limit_gpu: int
- :param limit_ephemeral_storage: Limit for ephermeral storage
+ :param limit_ephemeral_storage: Limit for ephemeral storage
:type limit_ephemeral_storage: float | str
"""
def __init__(
diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index e46407b..d11c175 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -344,18 +344,26 @@ class PodGenerator(object):
resources = namespaced.get('resources')
if resources is None:
- requests = {
- 'cpu': namespaced.get('request_cpu'),
- 'memory': namespaced.get('request_memory'),
- 'ephemeral-storage': namespaced.get('ephemeral-storage')
- }
- limits = {
- 'cpu': namespaced.get('limit_cpu'),
- 'memory': namespaced.get('limit_memory'),
- 'ephemeral-storage': namespaced.get('ephemeral-storage')
- }
- all_resources = list(requests.values()) + list(limits.values())
- if all(r is None for r in all_resources):
+ def extract(cpu, memory, ephemeral_storage, limit_gpu=None):
+ resources_obj = {
+ 'cpu': namespaced.pop(cpu, None),
+ 'memory': namespaced.pop(memory, None),
+ 'ephemeral-storage': namespaced.pop(ephemeral_storage, None),
+ }
+ if limit_gpu is not None:
+ resources_obj['nvidia.com/gpu'] = namespaced.pop(limit_gpu, None)
+
+ resources_obj = {k: v for k, v in resources_obj.items() if v is not None}
+
+ if all(r is None for r in resources_obj):
+ resources_obj = None
+ return namespaced, resources_obj
+
+ namespaced, requests = extract('request_cpu', 'request_memory', 'request_ephemeral_storage')
+ namespaced, limits = extract('limit_cpu', 'limit_memory', 'limit_ephemeral_storage',
+ limit_gpu='limit_gpu')
+
+ if requests is None and limits is None:
resources = None
else:
resources = k8s.V1ResourceRequirements(
@@ -371,6 +379,7 @@ class PodGenerator(object):
'iam.cloud.google.com/service-account': gcp_service_account_key
})
+ namespaced['resources'] = resources
return PodGenerator(**namespaced).gen_pod()
@staticmethod
diff --git a/tests/kubernetes/test_pod_generator.py b/tests/kubernetes/test_pod_generator.py
index 7d39cdc..d0faf4c 100644
--- a/tests/kubernetes/test_pod_generator.py
+++ b/tests/kubernetes/test_pod_generator.py
@@ -288,6 +288,138 @@ class TestPodGenerator(unittest.TestCase):
}, result)
@mock.patch('uuid.uuid4')
+ def test_from_obj_with_resources(self, mock_uuid):
+ self.maxDiff = None
+
+ mock_uuid.return_value = self.static_uuid
+ result = PodGenerator.from_obj({
+ "KubernetesExecutor": {
+ "annotations": {"test": "annotation"},
+ "volumes": [
+ {
+ "name": "example-kubernetes-test-volume",
+ "hostPath": {"path": "/tmp/"},
+ },
+ ],
+ "volume_mounts": [
+ {
+ "mountPath": "/foo/",
+ "name": "example-kubernetes-test-volume",
+ },
+ ],
+ 'request_cpu': "200m",
+ 'limit_cpu': "400m",
+ 'request_memory': "500Mi",
+ 'limit_memory': "1000Mi",
+ 'limit_gpu': "2",
+ 'request_ephemeral_storage': '2Gi',
+ 'limit_ephemeral_storage': '4Gi',
+ }
+ })
+ result = self.k8s_client.sanitize_for_serialization(result)
+
+ self.assertEqual({
+ 'apiVersion': 'v1',
+ 'kind': 'Pod',
+ 'metadata': {
+ 'annotations': {'test': 'annotation'},
+ },
+ 'spec': {
+ 'containers': [{
+ 'args': [],
+ 'command': [],
+ 'env': [],
+ 'envFrom': [],
+ 'name': 'base',
+ 'ports': [],
+ 'resources': {
+ 'limits': {
+ 'cpu': '400m',
+ 'ephemeral-storage': '4Gi',
+ 'memory': '1000Mi',
+ 'nvidia.com/gpu': "2",
+ },
+ 'requests': {
+ 'cpu': '200m',
+ 'ephemeral-storage': '2Gi',
+ 'memory': '500Mi',
+ },
+ },
+ 'volumeMounts': [{
+ 'mountPath': '/foo/',
+ 'name': 'example-kubernetes-test-volume'
+ }],
+ }],
+ 'hostNetwork': False,
+ 'imagePullSecrets': [],
+ 'volumes': [{
+ 'hostPath': {'path': '/tmp/'},
+ 'name': 'example-kubernetes-test-volume'
+ }],
+ }
+ }, result)
+
+ @mock.patch('uuid.uuid4')
+ def test_from_obj_with_only_request_resources(self, mock_uuid):
+ self.maxDiff = None
+
+ mock_uuid.return_value = self.static_uuid
+ result = PodGenerator.from_obj({
+ "KubernetesExecutor": {
+ "annotations": {"test": "annotation"},
+ "volumes": [
+ {
+ "name": "example-kubernetes-test-volume",
+ "hostPath": {"path": "/tmp/"},
+ },
+ ],
+ "volume_mounts": [
+ {
+ "mountPath": "/foo/",
+ "name": "example-kubernetes-test-volume",
+ },
+ ],
+ 'request_cpu': "200m",
+ 'request_memory': "500Mi",
+ }
+ })
+ result = self.k8s_client.sanitize_for_serialization(result)
+
+ self.assertEqual({
+ 'apiVersion': 'v1',
+ 'kind': 'Pod',
+ 'metadata': {
+ 'annotations': {'test': 'annotation'},
+ },
+ 'spec': {
+ 'containers': [{
+ 'args': [],
+ 'command': [],
+ 'env': [],
+ 'envFrom': [],
+ 'name': 'base',
+ 'ports': [],
+ 'resources': {
+ 'requests': {
+ 'cpu': '200m',
+ 'memory': '500Mi',
+ },
+ },
+ 'volumeMounts': [{
+ 'mountPath': '/foo/',
+ 'name': 'example-kubernetes-test-volume'
+ }],
+ }],
+ 'hostNetwork': False,
+ 'imagePullSecrets': [],
+ 'volumes': [{
+ 'hostPath': {'path': '/tmp/'},
+ 'name': 'example-kubernetes-test-volume'
+ }],
+ }
+ }, result)
+
+ @mock.patch('uuid.uuid4')
def test_reconcile_pods_empty_mutator_pod(self, mock_uuid):
mock_uuid.return_value = self.static_uuid
base_pod = PodGenerator(