You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/11/19 19:46:15 UTC
[airflow] 06/06: Fixes issue with affinity backcompat in Airflow
1.10
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit b9a275585980bebb647eb27e278b6d3c2cc8331a
Author: Daniel Imberman <da...@gmail.com>
AuthorDate: Thu Nov 19 10:29:59 2020 -0800
Fixes issue with affinity backcompat in Airflow 1.10
There was a breaking change in 1.10.12 where the affinity argument
was being turned into a k8s.V1Affinity object instead of a python dict.
This commit solves https://github.com/apache/airflow/issues/11731
---
airflow/kubernetes/pod_launcher.py | 2 +-
tests/kubernetes/test_pod_launcher.py | 319 ++++++++++++++++++++--------------
2 files changed, 189 insertions(+), 132 deletions(-)
diff --git a/airflow/kubernetes/pod_launcher.py b/airflow/kubernetes/pod_launcher.py
index 704a77e..468e077 100644
--- a/airflow/kubernetes/pod_launcher.py
+++ b/airflow/kubernetes/pod_launcher.py
@@ -326,7 +326,7 @@ def _convert_to_airflow_pod(pod):
resources=base_container.resources,
service_account_name=pod.spec.service_account_name,
secrets=secrets,
- affinity=pod.spec.affinity,
+ affinity=api_client.sanitize_for_serialization(pod.spec.affinity),
hostnetwork=pod.spec.host_network,
security_context=_extract_security_context(pod.spec.security_context)
)
diff --git a/tests/kubernetes/test_pod_launcher.py b/tests/kubernetes/test_pod_launcher.py
index 63169ae..c705f2e 100644
--- a/tests/kubernetes/test_pod_launcher.py
+++ b/tests/kubernetes/test_pod_launcher.py
@@ -30,7 +30,6 @@ from airflow.kubernetes.volume_mount import VolumeMount
class TestPodLauncher(unittest.TestCase):
-
def setUp(self):
self.mock_kube_client = mock.Mock()
self.pod_launcher = PodLauncher(kube_client=self.mock_kube_client)
@@ -44,60 +43,60 @@ class TestPodLauncher(unittest.TestCase):
def test_read_pod_logs_retries_successfully(self):
mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.read_namespaced_pod_log.side_effect = [
- BaseHTTPError('Boom'),
- mock.sentinel.logs
+ BaseHTTPError("Boom"),
+ mock.sentinel.logs,
]
logs = self.pod_launcher.read_pod_logs(mock.sentinel)
self.assertEqual(mock.sentinel.logs, logs)
- self.mock_kube_client.read_namespaced_pod_log.assert_has_calls([
- mock.call(
- _preload_content=False,
- container='base',
- follow=True,
- name=mock.sentinel.metadata.name,
- namespace=mock.sentinel.metadata.namespace,
- tail_lines=10
- ),
- mock.call(
- _preload_content=False,
- container='base',
- follow=True,
- name=mock.sentinel.metadata.name,
- namespace=mock.sentinel.metadata.namespace,
- tail_lines=10
- )
- ])
+ self.mock_kube_client.read_namespaced_pod_log.assert_has_calls(
+ [
+ mock.call(
+ _preload_content=False,
+ container="base",
+ follow=True,
+ name=mock.sentinel.metadata.name,
+ namespace=mock.sentinel.metadata.namespace,
+ tail_lines=10,
+ ),
+ mock.call(
+ _preload_content=False,
+ container="base",
+ follow=True,
+ name=mock.sentinel.metadata.name,
+ namespace=mock.sentinel.metadata.namespace,
+ tail_lines=10,
+ ),
+ ]
+ )
def test_read_pod_logs_retries_fails(self):
mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.read_namespaced_pod_log.side_effect = [
- BaseHTTPError('Boom'),
- BaseHTTPError('Boom'),
- BaseHTTPError('Boom')
+ BaseHTTPError("Boom"),
+ BaseHTTPError("Boom"),
+ BaseHTTPError("Boom"),
]
self.assertRaises(
- AirflowException,
- self.pod_launcher.read_pod_logs,
- mock.sentinel
+ AirflowException, self.pod_launcher.read_pod_logs, mock.sentinel
)
def test_read_pod_logs_successfully_with_tail_lines(self):
mock.sentinel.metadata = mock.MagicMock()
- self.mock_kube_client.read_namespaced_pod_log.side_effect = [
- mock.sentinel.logs
- ]
+ self.mock_kube_client.read_namespaced_pod_log.side_effect = [mock.sentinel.logs]
logs = self.pod_launcher.read_pod_logs(mock.sentinel, 100)
self.assertEqual(mock.sentinel.logs, logs)
- self.mock_kube_client.read_namespaced_pod_log.assert_has_calls([
- mock.call(
- _preload_content=False,
- container='base',
- follow=True,
- name=mock.sentinel.metadata.name,
- namespace=mock.sentinel.metadata.namespace,
- tail_lines=100
- ),
- ])
+ self.mock_kube_client.read_namespaced_pod_log.assert_has_calls(
+ [
+ mock.call(
+ _preload_content=False,
+ container="base",
+ follow=True,
+ name=mock.sentinel.metadata.name,
+ namespace=mock.sentinel.metadata.namespace,
+ tail_lines=100,
+ ),
+ ]
+ )
def test_read_pod_events_successfully_returns_events(self):
mock.sentinel.metadata = mock.MagicMock()
@@ -108,33 +107,37 @@ class TestPodLauncher(unittest.TestCase):
def test_read_pod_events_retries_successfully(self):
mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.list_namespaced_event.side_effect = [
- BaseHTTPError('Boom'),
- mock.sentinel.events
+ BaseHTTPError("Boom"),
+ mock.sentinel.events,
]
events = self.pod_launcher.read_pod_events(mock.sentinel)
self.assertEqual(mock.sentinel.events, events)
- self.mock_kube_client.list_namespaced_event.assert_has_calls([
- mock.call(
- namespace=mock.sentinel.metadata.namespace,
- field_selector="involvedObject.name={}".format(mock.sentinel.metadata.name)
- ),
- mock.call(
- namespace=mock.sentinel.metadata.namespace,
- field_selector="involvedObject.name={}".format(mock.sentinel.metadata.name)
- )
- ])
+ self.mock_kube_client.list_namespaced_event.assert_has_calls(
+ [
+ mock.call(
+ namespace=mock.sentinel.metadata.namespace,
+ field_selector="involvedObject.name={}".format(
+ mock.sentinel.metadata.name
+ ),
+ ),
+ mock.call(
+ namespace=mock.sentinel.metadata.namespace,
+ field_selector="involvedObject.name={}".format(
+ mock.sentinel.metadata.name
+ ),
+ ),
+ ]
+ )
def test_read_pod_events_retries_fails(self):
mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.list_namespaced_event.side_effect = [
- BaseHTTPError('Boom'),
- BaseHTTPError('Boom'),
- BaseHTTPError('Boom')
+ BaseHTTPError("Boom"),
+ BaseHTTPError("Boom"),
+ BaseHTTPError("Boom"),
]
self.assertRaises(
- AirflowException,
- self.pod_launcher.read_pod_events,
- mock.sentinel
+ AirflowException, self.pod_launcher.read_pod_events, mock.sentinel
)
def test_read_pod_returns_logs(self):
@@ -146,42 +149,66 @@ class TestPodLauncher(unittest.TestCase):
def test_read_pod_retries_successfully(self):
mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.read_namespaced_pod.side_effect = [
- BaseHTTPError('Boom'),
- mock.sentinel.pod_info
+ BaseHTTPError("Boom"),
+ mock.sentinel.pod_info,
]
pod_info = self.pod_launcher.read_pod(mock.sentinel)
self.assertEqual(mock.sentinel.pod_info, pod_info)
- self.mock_kube_client.read_namespaced_pod.assert_has_calls([
- mock.call(mock.sentinel.metadata.name, mock.sentinel.metadata.namespace),
- mock.call(mock.sentinel.metadata.name, mock.sentinel.metadata.namespace)
- ])
+ self.mock_kube_client.read_namespaced_pod.assert_has_calls(
+ [
+ mock.call(
+ mock.sentinel.metadata.name, mock.sentinel.metadata.namespace
+ ),
+ mock.call(
+ mock.sentinel.metadata.name, mock.sentinel.metadata.namespace
+ ),
+ ]
+ )
def test_read_pod_retries_fails(self):
mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.read_namespaced_pod.side_effect = [
- BaseHTTPError('Boom'),
- BaseHTTPError('Boom'),
- BaseHTTPError('Boom')
+ BaseHTTPError("Boom"),
+ BaseHTTPError("Boom"),
+ BaseHTTPError("Boom"),
]
- self.assertRaises(
- AirflowException,
- self.pod_launcher.read_pod,
- mock.sentinel
- )
+ self.assertRaises(AirflowException, self.pod_launcher.read_pod, mock.sentinel)
class TestPodLauncherHelper(unittest.TestCase):
def test_convert_to_airflow_pod(self):
input_pod = k8s.V1Pod(
metadata=k8s.V1ObjectMeta(
- name="foo",
- namespace="bar"
+ name="foo", namespace="bar", annotations={"foo": "bar"}
),
spec=k8s.V1PodSpec(
+ affinity=k8s.V1Affinity(
+ pod_anti_affinity=k8s.V1PodAntiAffinity(
+ required_during_scheduling_ignored_during_execution=[
+ k8s.V1WeightedPodAffinityTerm(
+ weight=1,
+ pod_affinity_term=k8s.V1PodAffinityTerm(
+ label_selector=k8s.V1LabelSelector(
+ match_expressions=[
+ k8s.V1LabelSelectorRequirement(
+ key="security",
+ operator="In",
+ values="S1",
+ )
+ ]
+ ),
+ topology_key="failure-domain.beta.kubernetes.io/zone",
+ ),
+ )
+ ]
+ )
+ ),
init_containers=[
k8s.V1Container(
name="init-container",
- volume_mounts=[k8s.V1VolumeMount(mount_path="/tmp", name="init-secret")]
+ volume_mounts=[
+ k8s.V1VolumeMount(mount_path="/tmp", name="init-secret")
+ ],
)
],
containers=[
@@ -194,127 +221,157 @@ class TestPodLauncherHelper(unittest.TestCase):
name="AIRFLOW_SECRET",
value_from=k8s.V1EnvVarSource(
secret_key_ref=k8s.V1SecretKeySelector(
- name="ai",
- key="secret_key"
+ name="ai", key="secret_key"
)
- ))
+ ),
+ )
],
ports=[
- k8s.V1ContainerPort(
- name="myport",
- container_port=8080,
- )
+ k8s.V1ContainerPort(name="myport", container_port=8080,)
],
volume_mounts=[
k8s.V1VolumeMount(
name="myvolume",
mount_path="/tmp/mount",
- read_only="True"
+ read_only="True",
),
k8s.V1VolumeMount(
- name='airflow-config',
- mount_path='/config',
- sub_path='airflow.cfg',
- read_only=True
+ name="airflow-config",
+ mount_path="/config",
+ sub_path="airflow.cfg",
+ read_only=True,
),
k8s.V1VolumeMount(
name="airflow-secret",
mount_path="/opt/mount",
- read_only=True
- )]
+ read_only=True,
+ ),
+ ],
)
],
- security_context=k8s.V1PodSecurityContext(
- run_as_user=0,
- fs_group=0,
- ),
+ security_context=k8s.V1PodSecurityContext(run_as_user=0, fs_group=0,),
image_pull_secrets=[k8s.V1LocalObjectReference("my-secret")],
volumes=[
- k8s.V1Volume(
- name="myvolume"
- ),
+ k8s.V1Volume(name="myvolume"),
k8s.V1Volume(
name="airflow-config",
- config_map=k8s.V1ConfigMap(
- data="airflow-data"
- )
+ config_map=k8s.V1ConfigMap(data="airflow-data"),
),
k8s.V1Volume(
name="airflow-secret",
- secret=k8s.V1SecretVolumeSource(
- secret_name="secret-name",
- )
+ secret=k8s.V1SecretVolumeSource(secret_name="secret-name",),
),
k8s.V1Volume(
name="init-secret",
- secret=k8s.V1SecretVolumeSource(
- secret_name="init-secret",
- )
- )
- ]
- )
+ secret=k8s.V1SecretVolumeSource(secret_name="init-secret",),
+ ),
+ ],
+ ),
)
result_pod = _convert_to_airflow_pod(input_pod)
+ self.assertEqual(type(result_pod.affinity), dict)
+
expected = Pod(
name="foo",
namespace="bar",
+ annotations={"foo": "bar"},
envs={},
init_containers=[
- {'name': 'init-container', 'volumeMounts': [{'mountPath': '/tmp', 'name': 'init-secret'}]}
+ {
+ "name": "init-container",
+ "volumeMounts": [{"mountPath": "/tmp", "name": "init-secret"}],
+ }
],
cmds=["foo"],
image="myimage",
- ports=[
- Port(name="myport", container_port=8080)
- ],
+ ports=[Port(name="myport", container_port=8080)],
volume_mounts=[
VolumeMount(
name="myvolume",
mount_path="/tmp/mount",
sub_path=None,
- read_only="True"
+ read_only="True",
),
VolumeMount(
name="airflow-config",
read_only=True,
mount_path="/config",
- sub_path="airflow.cfg"
+ sub_path="airflow.cfg",
),
VolumeMount(
name="airflow-secret",
mount_path="/opt/mount",
sub_path=None,
- read_only=True
- )],
+ read_only=True,
+ ),
+ ],
image_pull_secrets="my-secret",
+ affinity={
+ "podAntiAffinity": {
+ "requiredDuringSchedulingIgnoredDuringExecution": [
+ {
+ "podAffinityTerm": {
+ "labelSelector": {
+ "matchExpressions": [
+ {
+ "key": "security",
+ "operator": "In",
+ "values": "S1",
+ }
+ ]
+ },
+ "topologyKey": "failure-domain.beta.kubernetes.io/zone",
+ },
+ "weight": 1,
+ }
+ ]
+ }
+ },
secrets=[Secret("env", "AIRFLOW_SECRET", "ai", "secret_key")],
- security_context={'fsGroup': 0, 'runAsUser': 0},
- volumes=[Volume(name="myvolume", configs={'name': 'myvolume'}),
- Volume(name="airflow-config", configs={'configMap': {'data': 'airflow-data'},
- 'name': 'airflow-config'}),
- Volume(name='airflow-secret', configs={'name': 'airflow-secret',
- 'secret': {'secretName': 'secret-name'}}),
- Volume(name='init-secret', configs={'name': 'init-secret', 'secret':
- {'secretName': 'init-secret'}})],
+ security_context={"fsGroup": 0, "runAsUser": 0},
+ volumes=[
+ Volume(name="myvolume", configs={"name": "myvolume"}),
+ Volume(
+ name="airflow-config",
+ configs={
+ "configMap": {"data": "airflow-data"},
+ "name": "airflow-config",
+ },
+ ),
+ Volume(
+ name="airflow-secret",
+ configs={
+ "name": "airflow-secret",
+ "secret": {"secretName": "secret-name"},
+ },
+ ),
+ Volume(
+ name="init-secret",
+ configs={
+ "name": "init-secret",
+ "secret": {"secretName": "init-secret"},
+ },
+ ),
+ ],
)
expected_dict = expected.as_dict()
result_dict = result_pod.as_dict()
print(result_pod.volume_mounts)
parsed_configs = self.pull_out_volumes(result_dict)
- result_dict['volumes'] = parsed_configs
- self.assertEqual(result_dict['secrets'], expected_dict['secrets'])
+ result_dict["volumes"] = parsed_configs
+ self.assertEqual(result_dict["secrets"], expected_dict["secrets"])
self.assertDictEqual(expected_dict, result_dict)
@staticmethod
def pull_out_volumes(result_dict):
parsed_configs = []
- for volume in result_dict['volumes']:
- vol = {'name': volume['name']}
+ for volume in result_dict["volumes"]:
+ vol = {"name": volume["name"]}
confs = {}
- for k, v in volume['configs'].items():
- if v and k[0] != '_':
+ for k, v in volume["configs"].items():
+ if v and k[0] != "_":
confs[k] = v
- vol['configs'] = confs
+ vol["configs"] = confs
parsed_configs.append(vol)
return parsed_configs