You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/01 02:26:36 UTC
[airflow] 01/01: Fix more PodMutationHook issues for backwards
compatibility
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch fix-more-pod-mutations-tests
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 3d7b30d5a4f98e3e60731141e4ce42755948c7ce
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Sat Aug 1 03:24:15 2020 +0100
Fix more PodMutationHook issues for backwards compatibility
This PR/commit
- Adds missing affinity from old POD
- Adds comprehensive tests to check pod_mutation_hook works well with both new and old PODs with various configs like volume, volumeMounts, Ports, affinity, tolerations etc
- Refactors various parts of k8s code
---
airflow/contrib/kubernetes/pod.py | 74 ++++++++-
airflow/kubernetes/pod.py | 2 +-
airflow/kubernetes/pod_launcher.py | 40 +++--
airflow/kubernetes/pod_launcher_helper.py | 96 -----------
tests/kubernetes/test_pod_launcher.py | 81 +++++++++-
tests/kubernetes/test_pod_launcher_helper.py | 98 -----------
tests/test_local_settings.py | 232 ++++++++++++++++++++++-----
7 files changed, 376 insertions(+), 247 deletions(-)
diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py
index 0ab3616..f2ce056 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -19,7 +19,13 @@
import warnings
# pylint: disable=unused-import
-from airflow.kubernetes.pod import Port, Resources # noqa
+from typing import List, Union
+
+from kubernetes.client import models as k8s
+
+from airflow.kubernetes.pod import Port, Resources # noqa
+from airflow.kubernetes.volume import Volume
+from airflow.kubernetes.volume_mount import VolumeMount
warnings.warn(
"This module is deprecated. Please use `airflow.kubernetes.pod`.",
@@ -154,6 +160,7 @@ class Pod(object):
dns_policy=self.dnspolicy,
host_network=self.hostnetwork,
tolerations=self.tolerations,
+ affinity=self.affinity,
security_context=self.security_context,
)
@@ -161,11 +168,11 @@ class Pod(object):
spec=spec,
metadata=meta,
)
- for port in self.ports:
+ for port in _extract_ports(self.ports):
pod = port.attach_to_pod(pod)
- for volume in self.volumes:
+ for volume in _extract_volumes(self.volumes):
pod = volume.attach_to_pod(pod)
- for volume_mount in self.volume_mounts:
+ for volume_mount in _extract_volume_mounts(self.volume_mounts):
pod = volume_mount.attach_to_pod(pod)
for secret in self.secrets:
pod = secret.attach_to_pod(pod)
@@ -182,3 +189,62 @@ class Pod(object):
res['volumes'] = [volume.as_dict() for volume in res['volumes']]
return res
+
+
+def _extract_env_vars(env_vars):
+ result = {}
+ env_vars = env_vars or [] # type: List[Union[k8s.V1EnvVar, dict]]
+ for env_var in env_vars:
+ if isinstance(env_var, k8s.V1EnvVar):
+ env_var.to_dict()
+ result[env_var.get("name")] = env_var.get("value")
+ return result
+
+
+def _extract_ports(ports):
+ result = []
+ ports = ports or [] # type: List[Union[k8s.V1ContainerPort, dict]]
+ for port in ports:
+ if isinstance(port, k8s.V1ContainerPort):
+ port = port.to_dict()
+ port = Port(name=port.get("name"), container_port=port.get("container_port"))
+ if not isinstance(port, Port):
+ port = Port(name=port.get("name"), container_port=port.get("containerPort"))
+ result.append(port)
+ return result
+
+
+def _extract_volume_mounts(volume_mounts):
+ result = []
+ volume_mounts = volume_mounts or [] # type: List[Union[k8s.V1VolumeMount, dict]]
+ for volume_mount in volume_mounts:
+ if isinstance(volume_mount, k8s.V1VolumeMount):
+ volume_mount = volume_mount.to_dict()
+ volume_mount = VolumeMount(
+ name=volume_mount.get("name"),
+ mount_path=volume_mount.get("mount_path"),
+ sub_path=volume_mount.get("sub_path"),
+ read_only=volume_mount.get("read_only")
+ )
+ elif not isinstance(volume_mount, VolumeMount):
+ volume_mount = VolumeMount(
+ name=volume_mount.get("name"),
+ mount_path=volume_mount.get("mountPath"),
+ sub_path=volume_mount.get("subPath"),
+ read_only=volume_mount.get("readOnly")
+ )
+
+ result.append(volume_mount)
+ return result
+
+
+def _extract_volumes(volumes):
+ result = []
+ volumes = volumes or [] # type: List[Union[k8s.V1Volume, dict]]
+ for volume in volumes:
+ if isinstance(volume, k8s.V1Volume):
+ volume = volume.to_dict()
+ if not isinstance(volume, Volume):
+ volume = Volume(name=volume.get("name"), configs=volume)
+ result.append(volume)
+ return result
diff --git a/airflow/kubernetes/pod.py b/airflow/kubernetes/pod.py
index 9e455af..c1854a1 100644
--- a/airflow/kubernetes/pod.py
+++ b/airflow/kubernetes/pod.py
@@ -20,7 +20,7 @@ Classes for interacting with Kubernetes API
import copy
-import kubernetes.client.models as k8s
+from kubernetes.client import models as k8s
from airflow.kubernetes.k8s_model import K8SModel
diff --git a/airflow/kubernetes/pod_launcher.py b/airflow/kubernetes/pod_launcher.py
index d6507df..dc75f8a 100644
--- a/airflow/kubernetes/pod_launcher.py
+++ b/airflow/kubernetes/pod_launcher.py
@@ -27,13 +27,15 @@ from kubernetes.stream import stream as kubernetes_stream
from requests.exceptions import BaseHTTPError
from airflow import AirflowException
-from airflow.kubernetes.pod_launcher_helper import convert_to_airflow_pod
from airflow.kubernetes.pod_generator import PodDefaults
from airflow import settings
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State
import kubernetes.client.models as k8s # noqa
from .kube_client import get_kube_client
+from ..contrib.kubernetes.pod import (
+ Pod, _extract_env_vars, _extract_volumes, _extract_volume_mounts, _extract_ports
+)
class PodStatus:
@@ -90,19 +92,17 @@ class PodLauncher(LoggingMixin):
def _mutate_pod_backcompat(pod):
"""Backwards compatible Pod Mutation Hook"""
try:
- settings.pod_mutation_hook(pod)
- # attempts to run pod_mutation_hook using k8s.V1Pod, if this
- # fails we attempt to run by converting pod to Old Pod
- except AttributeError:
+ dummy_pod = _convert_to_airflow_pod(pod)
+ settings.pod_mutation_hook(dummy_pod)
warnings.warn(
"Using `airflow.contrib.kubernetes.pod.Pod` is deprecated. "
"Please use `k8s.V1Pod` instead.", DeprecationWarning, stacklevel=2
)
- dummy_pod = convert_to_airflow_pod(pod)
- settings.pod_mutation_hook(dummy_pod)
dummy_pod = dummy_pod.to_v1_kubernetes_pod()
- return dummy_pod
- return pod
+ except AttributeError:
+ settings.pod_mutation_hook(pod)
+ return pod
+ return dummy_pod
def delete_pod(self, pod):
"""Deletes POD"""
@@ -269,7 +269,7 @@ class PodLauncher(LoggingMixin):
return None
def process_status(self, job_id, status):
- """Process status infomration for the JOB"""
+ """Process status information for the JOB"""
status = status.lower()
if status == PodStatus.PENDING:
return State.QUEUED
@@ -284,3 +284,23 @@ class PodLauncher(LoggingMixin):
else:
self.log.info('Event: Invalid state %s on job %s', status, job_id)
return State.FAILED
+
+
+def _convert_to_airflow_pod(pod):
+ base_container = pod.spec.containers[0] # type: k8s.V1Container
+
+ dummy_pod = Pod(
+ image=base_container.image,
+ envs=_extract_env_vars(base_container.env),
+ volumes=_extract_volumes(pod.spec.volumes),
+ volume_mounts=_extract_volume_mounts(base_container.volume_mounts),
+ labels=pod.metadata.labels,
+ name=pod.metadata.name,
+ namespace=pod.metadata.namespace,
+ image_pull_policy=base_container.image_pull_policy or 'IfNotPresent',
+ cmds=[],
+ ports=_extract_ports(base_container.ports),
+ tolerations=pod.spec.tolerations,
+ affinity=pod.spec.affinity
+ )
+ return dummy_pod
diff --git a/airflow/kubernetes/pod_launcher_helper.py b/airflow/kubernetes/pod_launcher_helper.py
deleted file mode 100644
index 8c9fc6e..0000000
--- a/airflow/kubernetes/pod_launcher_helper.py
+++ /dev/null
@@ -1,96 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from typing import List, Union
-
-import kubernetes.client.models as k8s # noqa
-
-from airflow.kubernetes.volume import Volume
-from airflow.kubernetes.volume_mount import VolumeMount
-from airflow.kubernetes.pod import Port
-from airflow.contrib.kubernetes.pod import Pod
-
-
-def convert_to_airflow_pod(pod):
- base_container = pod.spec.containers[0] # type: k8s.V1Container
-
- dummy_pod = Pod(
- image=base_container.image,
- envs=_extract_env_vars(base_container.env),
- volumes=_extract_volumes(pod.spec.volumes),
- volume_mounts=_extract_volume_mounts(base_container.volume_mounts),
- labels=pod.metadata.labels,
- name=pod.metadata.name,
- namespace=pod.metadata.namespace,
- image_pull_policy=base_container.image_pull_policy or 'IfNotPresent',
- cmds=[],
- ports=_extract_ports(base_container.ports)
- )
- return dummy_pod
-
-
-def _extract_env_vars(env_vars):
- """
-
- :param env_vars:
- :type env_vars: list
- :return: result
- :rtype: dict
- """
- result = {}
- env_vars = env_vars or [] # type: List[Union[k8s.V1EnvVar, dict]]
- for env_var in env_vars:
- if isinstance(env_var, k8s.V1EnvVar):
- env_var.to_dict()
- result[env_var.get("name")] = env_var.get("value")
- return result
-
-
-def _extract_volumes(volumes):
- result = []
- volumes = volumes or [] # type: List[Union[k8s.V1Volume, dict]]
- for volume in volumes:
- if isinstance(volume, k8s.V1Volume):
- volume = volume.to_dict()
- result.append(Volume(name=volume.get("name"), configs=volume))
- return result
-
-
-def _extract_volume_mounts(volume_mounts):
- result = []
- volume_mounts = volume_mounts or [] # type: List[Union[k8s.V1VolumeMount, dict]]
- for volume_mount in volume_mounts:
- if isinstance(volume_mount, k8s.V1VolumeMount):
- volume_mount = volume_mount.to_dict()
- result.append(
- VolumeMount(
- name=volume_mount.get("name"),
- mount_path=volume_mount.get("mount_path"),
- sub_path=volume_mount.get("sub_path"),
- read_only=volume_mount.get("read_only"))
- )
-
- return result
-
-
-def _extract_ports(ports):
- result = []
- ports = ports or [] # type: List[Union[k8s.V1ContainerPort, dict]]
- for port in ports:
- if isinstance(port, k8s.V1ContainerPort):
- port = port.to_dict()
- result.append(Port(name=port.get("name"), container_port=port.get("container_port")))
- return result
diff --git a/tests/kubernetes/test_pod_launcher.py b/tests/kubernetes/test_pod_launcher.py
index 09ba339..e7e9a44 100644
--- a/tests/kubernetes/test_pod_launcher.py
+++ b/tests/kubernetes/test_pod_launcher.py
@@ -16,11 +16,16 @@
# under the License.
import unittest
import mock
+from kubernetes.client import models as k8s
from requests.exceptions import BaseHTTPError
from airflow import AirflowException
-from airflow.kubernetes.pod_launcher import PodLauncher
+from airflow.contrib.kubernetes.pod import Pod
+from airflow.kubernetes.pod import Port
+from airflow.kubernetes.pod_launcher import PodLauncher, _convert_to_airflow_pod
+from airflow.kubernetes.volume import Volume
+from airflow.kubernetes.volume_mount import VolumeMount
class TestPodLauncher(unittest.TestCase):
@@ -162,3 +167,77 @@ class TestPodLauncher(unittest.TestCase):
self.pod_launcher.read_pod,
mock.sentinel
)
+
+
+class TestPodLauncherHelper(unittest.TestCase):
+ def test_convert_to_airflow_pod(self):
+ input_pod = k8s.V1Pod(
+ metadata=k8s.V1ObjectMeta(
+ name="foo",
+ namespace="bar"
+ ),
+ spec=k8s.V1PodSpec(
+ containers=[
+ k8s.V1Container(
+ name="base",
+ command="foo",
+ image="myimage",
+ ports=[
+ k8s.V1ContainerPort(
+ name="myport",
+ container_port=8080,
+ )
+ ],
+ volume_mounts=[k8s.V1VolumeMount(
+ name="mymount",
+ mount_path="/tmp/mount",
+ read_only="True"
+ )]
+ )
+ ],
+ volumes=[
+ k8s.V1Volume(
+ name="myvolume"
+ )
+ ]
+ )
+ )
+ result_pod = _convert_to_airflow_pod(input_pod)
+
+ expected = Pod(
+ name="foo",
+ namespace="bar",
+ envs={},
+ cmds=[],
+ image="myimage",
+ ports=[
+ Port(name="myport", container_port=8080)
+ ],
+ volume_mounts=[VolumeMount(
+ name="mymount",
+ mount_path="/tmp/mount",
+ sub_path=None,
+ read_only="True"
+ )],
+ volumes=[Volume(name="myvolume", configs={'name': 'myvolume'})]
+ )
+ expected_dict = expected.as_dict()
+ result_dict = result_pod.as_dict()
+ parsed_configs = self.pull_out_volumes(result_dict)
+ result_dict['volumes'] = parsed_configs
+ self.maxDiff = None
+
+ self.assertDictEqual(expected_dict, result_dict)
+
+ @staticmethod
+ def pull_out_volumes(result_dict):
+ parsed_configs = []
+ for volume in result_dict['volumes']:
+ vol = {'name': volume['name']}
+ confs = {}
+ for k, v in volume['configs'].items():
+ if v and k[0] != '_':
+ confs[k] = v
+ vol['configs'] = confs
+ parsed_configs.append(vol)
+ return parsed_configs
diff --git a/tests/kubernetes/test_pod_launcher_helper.py b/tests/kubernetes/test_pod_launcher_helper.py
deleted file mode 100644
index 761d138..0000000
--- a/tests/kubernetes/test_pod_launcher_helper.py
+++ /dev/null
@@ -1,98 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-import unittest
-
-from airflow.kubernetes.pod import Port
-from airflow.kubernetes.volume_mount import VolumeMount
-from airflow.kubernetes.volume import Volume
-from airflow.kubernetes.pod_launcher_helper import convert_to_airflow_pod
-from airflow.contrib.kubernetes.pod import Pod
-import kubernetes.client.models as k8s
-
-
-class TestPodLauncherHelper(unittest.TestCase):
- def test_convert_to_airflow_pod(self):
- input_pod = k8s.V1Pod(
- metadata=k8s.V1ObjectMeta(
- name="foo",
- namespace="bar"
- ),
- spec=k8s.V1PodSpec(
- containers=[
- k8s.V1Container(
- name="base",
- command="foo",
- image="myimage",
- ports=[
- k8s.V1ContainerPort(
- name="myport",
- container_port=8080,
- )
- ],
- volume_mounts=[k8s.V1VolumeMount(
- name="mymount",
- mount_path="/tmp/mount",
- read_only="True"
- )]
- )
- ],
- volumes=[
- k8s.V1Volume(
- name="myvolume"
- )
- ]
- )
- )
- result_pod = convert_to_airflow_pod(input_pod)
-
- expected = Pod(
- name="foo",
- namespace="bar",
- envs={},
- cmds=[],
- image="myimage",
- ports=[
- Port(name="myport", container_port=8080)
- ],
- volume_mounts=[VolumeMount(
- name="mymount",
- mount_path="/tmp/mount",
- sub_path=None,
- read_only="True"
- )],
- volumes=[Volume(name="myvolume", configs={'name': 'myvolume'})]
- )
- expected_dict = expected.as_dict()
- result_dict = result_pod.as_dict()
- parsed_configs = self.pull_out_volumes(result_dict)
- result_dict['volumes'] = parsed_configs
- self.maxDiff = None
-
- self.assertDictEqual(expected_dict, result_dict)
-
- @staticmethod
- def pull_out_volumes(result_dict):
- parsed_configs = []
- for volume in result_dict['volumes']:
- vol = {'name': volume['name']}
- confs = {}
- for k, v in volume['configs'].items():
- if v and k[0] != '_':
- confs[k] = v
- vol['configs'] = confs
- parsed_configs.append(vol)
- return parsed_configs
diff --git a/tests/test_local_settings.py b/tests/test_local_settings.py
index 0e45ad8..4f1d0b0 100644
--- a/tests/test_local_settings.py
+++ b/tests/test_local_settings.py
@@ -22,7 +22,9 @@ import sys
import tempfile
import unittest
from airflow.kubernetes import pod_generator
-from tests.compat import MagicMock, Mock, call, patch
+from tests.compat import MagicMock, Mock, mock, call, patch
+
+from kubernetes.client.api_client import ApiClient
SETTINGS_FILE_POLICY = """
@@ -48,18 +50,71 @@ def pod_mutation_hook(pod):
pod.namespace = 'airflow-tests'
pod.image = 'my_image'
pod.volumes.append(Volume(name="bar", configs={}))
- pod.ports = [Port(container_port=8080)]
+ pod.ports = [Port(container_port=8080), {"containerPort": 8081}]
pod.resources = Resources(
request_memory="2G",
request_cpu="200Mi",
limit_gpu="200G"
)
+ secret_volume = {
+ "name": "airflow-secrets-mount",
+ "secret": {
+ "secretName": "airflow-test-secrets"
+ }
+ }
+ secret_volume_mount = {
+ "name": "airflow-secrets-mount",
+ "readOnly": True,
+ "mountPath": "/opt/airflow/secrets/"
+ }
+
+ pod.volumes.append(secret_volume)
+ pod.volume_mounts.append(secret_volume_mount)
+
+ pod.labels.update({"test_label": "test_value"})
+ pod.envs.update({"TEST_USER": "ADMIN"})
+
+ pod.tolerations += [
+ {"key": "dynamic-pods", "operator": "Equal", "value": "true", "effect": "NoSchedule"}
+ ]
+ pod.affinity.update(
+ {"nodeAffinity":
+ {"requiredDuringSchedulingIgnoredDuringExecution":
+ {"nodeSelectorTerms":
+ [{
+ "matchExpressions": [
+ {"key": "test/dynamic-pods", "operator": "In", "values": ["true"]}
+ ]
+ }]
+ }
+ }
+ }
+ )
"""
SETTINGS_FILE_POD_MUTATION_HOOK_V1_POD = """
def pod_mutation_hook(pod):
- pod.spec.containers[0].image = "test-image"
+ from kubernetes.client import models as k8s
+ secret_volume = {
+ "name": "airflow-secrets-mount",
+ "secret": {
+ "secretName": "airflow-test-secrets"
+ }
+ }
+ secret_volume_mount = {
+ "name": "airflow-secrets-mount",
+ "readOnly": True,
+ "mountPath": "/opt/airflow/secrets/"
+ }
+ base_container = pod.spec.containers[0]
+ base_container.image = "test-image"
+ base_container.volume_mounts.append(secret_volume_mount)
+ base_container.env.extend([{'name': 'TEST_USER', 'value': 'ADMIN'}])
+ base_container.ports.extend([{'containerPort': 8080}, k8s.V1ContainerPort(container_port=8081)])
+
+ pod.spec.volumes.append(secret_volume)
+ pod.metadata.namespace = 'airflow-tests'
"""
@@ -85,6 +140,7 @@ class LocalSettingsTest(unittest.TestCase):
# Make sure that the configure_logging is not cached
def setUp(self):
self.old_modules = dict(sys.modules)
+ self.maxDiff = None
def tearDown(self):
# Remove any new modules imported during the test run. This lets us
@@ -181,50 +237,101 @@ class LocalSettingsTest(unittest.TestCase):
self.mock_kube_client = Mock()
self.pod_launcher = PodLauncher(kube_client=self.mock_kube_client)
+ self.api_client = ApiClient()
pod = pod_generator.PodGenerator(
image="foo",
name="bar",
namespace="baz",
image_pull_policy="Never",
cmds=["foo"],
+ tolerations=[
+ {'effect': 'NoSchedule',
+ 'key': 'static-pods',
+ 'operator': 'Equal',
+ 'value': 'true'}
+ ],
volume_mounts=[
- {"name": "foo", "mount_path": "/mnt", "sub_path": "/", "read_only": "True"}
+ {"name": "foo", "mountPath": "/mnt", "subPath": "/", "readOnly": True}
],
volumes=[{"name": "foo"}]
).gen_pod()
- self.assertEqual(pod.metadata.namespace, "baz")
- self.assertEqual(pod.spec.containers[0].image, "foo")
- self.assertEqual(pod.spec.volumes, [{'name': 'foo'}])
- self.assertEqual(pod.spec.containers[0].ports, [])
- self.assertEqual(pod.spec.containers[0].resources, None)
+ sanitized_pod_pre_mutation = self.api_client.sanitize_for_serialization(pod)
+ self.assertEqual(
+ sanitized_pod_pre_mutation,
+ {'apiVersion': 'v1',
+ 'kind': 'Pod',
+ 'metadata': {'name': mock.ANY,
+ 'namespace': 'baz'},
+ 'spec': {'containers': [{'args': [],
+ 'command': ['foo'],
+ 'env': [],
+ 'envFrom': [],
+ 'image': 'foo',
+ 'imagePullPolicy': 'Never',
+ 'name': 'base',
+ 'ports': [],
+ 'volumeMounts': [{'mountPath': '/mnt',
+ 'name': 'foo',
+ 'readOnly': True,
+ 'subPath': '/'}]}],
+ 'hostNetwork': False,
+ 'imagePullSecrets': [],
+ 'tolerations': [{'effect': 'NoSchedule',
+ 'key': 'static-pods',
+ 'operator': 'Equal',
+ 'value': 'true'}],
+ 'volumes': [{'name': 'foo'}]}}
+ )
+ # Apply Pod Mutation Hook
pod = self.pod_launcher._mutate_pod_backcompat(pod)
- self.assertEqual(pod.metadata.namespace, "airflow-tests")
- self.assertEqual(pod.spec.containers[0].image, "my_image")
- self.assertEqual(pod.spec.volumes, [{'name': 'foo'}, {'name': 'bar'}])
- self.maxDiff = None
+ sanitized_pod_post_mutation = self.api_client.sanitize_for_serialization(pod)
self.assertEqual(
- pod.spec.containers[0].ports[0].to_dict(),
- {
- "container_port": 8080,
- "host_ip": None,
- "host_port": None,
- "name": None,
- "protocol": None
- }
- )
- self.assertEqual(
- pod.spec.containers[0].resources.to_dict(),
- {
- 'limits': {
- 'cpu': None,
- 'memory': None,
- 'ephemeral-storage': None,
- 'nvidia.com/gpu': '200G'},
- 'requests': {'cpu': '200Mi', 'ephemeral-storage': None, 'memory': '2G'}
- }
+ sanitized_pod_post_mutation,
+ {'metadata': {'labels': {'test_label': 'test_value'},
+ 'name': mock.ANY,
+ 'namespace': 'airflow-tests'},
+ 'spec': {'affinity': {'nodeAffinity': {'requiredDuringSchedulingIgnoredDuringExecution': {
+ 'nodeSelectorTerms': [{'matchExpressions': [{'key': 'test/dynamic-pods',
+ 'operator': 'In',
+ 'values': ['true']}]}]}}},
+ 'containers': [{'args': [],
+ 'command': [],
+ 'env': [{'name': 'TEST_USER', 'value': 'ADMIN'}],
+ 'image': 'my_image',
+ 'imagePullPolicy': 'Never',
+ 'name': 'base',
+ 'ports': [{'containerPort': 8080},
+ {'containerPort': 8081}],
+ 'resources': {'limits': {'cpu': None,
+ 'ephemeral-storage': None,
+ 'memory': None,
+ 'nvidia.com/gpu': '200G'},
+ 'requests': {'cpu': '200Mi',
+ 'ephemeral-storage': None,
+ 'memory': '2G'}},
+ 'volumeMounts': [{'mountPath': '/mnt',
+ 'name': 'foo',
+ 'readOnly': True,
+ 'subPath': '/'},
+ {'mountPath': '/opt/airflow/secrets/',
+ 'name': 'airflow-secrets-mount',
+ 'readOnly': True}]}],
+ 'hostNetwork': False,
+ 'tolerations': [{'effect': 'NoSchedule',
+ 'key': 'static-pods',
+ 'operator': 'Equal',
+ 'value': 'true'},
+ {'effect': 'NoSchedule',
+ 'key': 'dynamic-pods',
+ 'operator': 'Equal',
+ 'value': 'true'}],
+ 'volumes': [{'name': 'foo'},
+ {'name': 'bar'},
+ {'name': 'airflow-secrets-mount',
+ 'secret': {'secretName': 'airflow-test-secrets'}}]}}
)
def test_pod_mutation_v1_pod(self):
@@ -234,19 +341,70 @@ class LocalSettingsTest(unittest.TestCase):
from airflow.kubernetes.pod_launcher import PodLauncher
self.mock_kube_client = Mock()
+ self.api_client = ApiClient()
self.pod_launcher = PodLauncher(kube_client=self.mock_kube_client)
pod = pod_generator.PodGenerator(
image="myimage",
cmds=["foo"],
- volume_mounts={
- "name": "foo", "mount_path": "/mnt", "sub_path": "/", "read_only": "True"
- },
+ namespace="baz",
+ volume_mounts=[
+ {"name": "foo", "mountPath": "/mnt", "subPath": "/", "readOnly": True}
+ ],
volumes=[{"name": "foo"}]
).gen_pod()
- self.assertEqual(pod.spec.containers[0].image, "myimage")
+ sanitized_pod_pre_mutation = self.api_client.sanitize_for_serialization(pod)
+
+ self.assertEqual(
+ sanitized_pod_pre_mutation,
+ {'apiVersion': 'v1',
+ 'kind': 'Pod',
+ 'metadata': {'namespace': 'baz'},
+ 'spec': {'containers': [{'args': [],
+ 'command': ['foo'],
+ 'env': [],
+ 'envFrom': [],
+ 'image': 'myimage',
+ 'name': 'base',
+ 'ports': [],
+ 'volumeMounts': [{'mountPath': '/mnt',
+ 'name': 'foo',
+ 'readOnly': True,
+ 'subPath': '/'}]}],
+ 'hostNetwork': False,
+ 'imagePullSecrets': [],
+ 'volumes': [{'name': 'foo'}]}}
+ )
+
+ # Apply Pod Mutation Hook
pod = self.pod_launcher._mutate_pod_backcompat(pod)
- self.assertEqual(pod.spec.containers[0].image, "test-image")
+
+ sanitized_pod_post_mutation = self.api_client.sanitize_for_serialization(pod)
+ self.assertEqual(
+ sanitized_pod_post_mutation,
+ {'apiVersion': 'v1',
+ 'kind': 'Pod',
+ 'metadata': {'namespace': 'airflow-tests'},
+ 'spec': {'containers': [{'args': [],
+ 'command': ['foo'],
+ 'env': [{'name': 'TEST_USER', 'value': 'ADMIN'}],
+ 'envFrom': [],
+ 'image': 'test-image',
+ 'name': 'base',
+ 'ports': [{'containerPort': 8080}, {'containerPort': 8081}],
+ 'volumeMounts': [{'mountPath': '/mnt',
+ 'name': 'foo',
+ 'readOnly': True,
+ 'subPath': '/'},
+ {'mountPath': '/opt/airflow/secrets/',
+ 'name': 'airflow-secrets-mount',
+ 'readOnly': True}]}],
+ 'hostNetwork': False,
+ 'imagePullSecrets': [],
+ 'volumes': [{'name': 'foo'},
+ {'name': 'airflow-secrets-mount',
+ 'secret': {'secretName': 'airflow-test-secrets'}}]}}
+ )
class TestStatsWithAllowList(unittest.TestCase):