You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/11/19 19:46:15 UTC

[airflow] 06/06: Fixes issue with affinity backcompat in Airflow 1.10

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit b9a275585980bebb647eb27e278b6d3c2cc8331a
Author: Daniel Imberman <da...@gmail.com>
AuthorDate: Thu Nov 19 10:29:59 2020 -0800

    Fixes issue with affinity backcompat in Airflow 1.10
    
    There was a breaking change in 1.10.12 where the affinity argument
    was being turned into a k8s.V1Affinity object instead of a python dict.
    
    This commit solves https://github.com/apache/airflow/issues/11731
---
 airflow/kubernetes/pod_launcher.py    |   2 +-
 tests/kubernetes/test_pod_launcher.py | 319 ++++++++++++++++++++--------------
 2 files changed, 189 insertions(+), 132 deletions(-)

diff --git a/airflow/kubernetes/pod_launcher.py b/airflow/kubernetes/pod_launcher.py
index 704a77e..468e077 100644
--- a/airflow/kubernetes/pod_launcher.py
+++ b/airflow/kubernetes/pod_launcher.py
@@ -326,7 +326,7 @@ def _convert_to_airflow_pod(pod):
         resources=base_container.resources,
         service_account_name=pod.spec.service_account_name,
         secrets=secrets,
-        affinity=pod.spec.affinity,
+        affinity=api_client.sanitize_for_serialization(pod.spec.affinity),
         hostnetwork=pod.spec.host_network,
         security_context=_extract_security_context(pod.spec.security_context)
     )
diff --git a/tests/kubernetes/test_pod_launcher.py b/tests/kubernetes/test_pod_launcher.py
index 63169ae..c705f2e 100644
--- a/tests/kubernetes/test_pod_launcher.py
+++ b/tests/kubernetes/test_pod_launcher.py
@@ -30,7 +30,6 @@ from airflow.kubernetes.volume_mount import VolumeMount
 
 
 class TestPodLauncher(unittest.TestCase):
-
     def setUp(self):
         self.mock_kube_client = mock.Mock()
         self.pod_launcher = PodLauncher(kube_client=self.mock_kube_client)
@@ -44,60 +43,60 @@ class TestPodLauncher(unittest.TestCase):
     def test_read_pod_logs_retries_successfully(self):
         mock.sentinel.metadata = mock.MagicMock()
         self.mock_kube_client.read_namespaced_pod_log.side_effect = [
-            BaseHTTPError('Boom'),
-            mock.sentinel.logs
+            BaseHTTPError("Boom"),
+            mock.sentinel.logs,
         ]
         logs = self.pod_launcher.read_pod_logs(mock.sentinel)
         self.assertEqual(mock.sentinel.logs, logs)
-        self.mock_kube_client.read_namespaced_pod_log.assert_has_calls([
-            mock.call(
-                _preload_content=False,
-                container='base',
-                follow=True,
-                name=mock.sentinel.metadata.name,
-                namespace=mock.sentinel.metadata.namespace,
-                tail_lines=10
-            ),
-            mock.call(
-                _preload_content=False,
-                container='base',
-                follow=True,
-                name=mock.sentinel.metadata.name,
-                namespace=mock.sentinel.metadata.namespace,
-                tail_lines=10
-            )
-        ])
+        self.mock_kube_client.read_namespaced_pod_log.assert_has_calls(
+            [
+                mock.call(
+                    _preload_content=False,
+                    container="base",
+                    follow=True,
+                    name=mock.sentinel.metadata.name,
+                    namespace=mock.sentinel.metadata.namespace,
+                    tail_lines=10,
+                ),
+                mock.call(
+                    _preload_content=False,
+                    container="base",
+                    follow=True,
+                    name=mock.sentinel.metadata.name,
+                    namespace=mock.sentinel.metadata.namespace,
+                    tail_lines=10,
+                ),
+            ]
+        )
 
     def test_read_pod_logs_retries_fails(self):
         mock.sentinel.metadata = mock.MagicMock()
         self.mock_kube_client.read_namespaced_pod_log.side_effect = [
-            BaseHTTPError('Boom'),
-            BaseHTTPError('Boom'),
-            BaseHTTPError('Boom')
+            BaseHTTPError("Boom"),
+            BaseHTTPError("Boom"),
+            BaseHTTPError("Boom"),
         ]
         self.assertRaises(
-            AirflowException,
-            self.pod_launcher.read_pod_logs,
-            mock.sentinel
+            AirflowException, self.pod_launcher.read_pod_logs, mock.sentinel
         )
 
     def test_read_pod_logs_successfully_with_tail_lines(self):
         mock.sentinel.metadata = mock.MagicMock()
-        self.mock_kube_client.read_namespaced_pod_log.side_effect = [
-            mock.sentinel.logs
-        ]
+        self.mock_kube_client.read_namespaced_pod_log.side_effect = [mock.sentinel.logs]
         logs = self.pod_launcher.read_pod_logs(mock.sentinel, 100)
         self.assertEqual(mock.sentinel.logs, logs)
-        self.mock_kube_client.read_namespaced_pod_log.assert_has_calls([
-            mock.call(
-                _preload_content=False,
-                container='base',
-                follow=True,
-                name=mock.sentinel.metadata.name,
-                namespace=mock.sentinel.metadata.namespace,
-                tail_lines=100
-            ),
-        ])
+        self.mock_kube_client.read_namespaced_pod_log.assert_has_calls(
+            [
+                mock.call(
+                    _preload_content=False,
+                    container="base",
+                    follow=True,
+                    name=mock.sentinel.metadata.name,
+                    namespace=mock.sentinel.metadata.namespace,
+                    tail_lines=100,
+                ),
+            ]
+        )
 
     def test_read_pod_events_successfully_returns_events(self):
         mock.sentinel.metadata = mock.MagicMock()
@@ -108,33 +107,37 @@ class TestPodLauncher(unittest.TestCase):
     def test_read_pod_events_retries_successfully(self):
         mock.sentinel.metadata = mock.MagicMock()
         self.mock_kube_client.list_namespaced_event.side_effect = [
-            BaseHTTPError('Boom'),
-            mock.sentinel.events
+            BaseHTTPError("Boom"),
+            mock.sentinel.events,
         ]
         events = self.pod_launcher.read_pod_events(mock.sentinel)
         self.assertEqual(mock.sentinel.events, events)
-        self.mock_kube_client.list_namespaced_event.assert_has_calls([
-            mock.call(
-                namespace=mock.sentinel.metadata.namespace,
-                field_selector="involvedObject.name={}".format(mock.sentinel.metadata.name)
-            ),
-            mock.call(
-                namespace=mock.sentinel.metadata.namespace,
-                field_selector="involvedObject.name={}".format(mock.sentinel.metadata.name)
-            )
-        ])
+        self.mock_kube_client.list_namespaced_event.assert_has_calls(
+            [
+                mock.call(
+                    namespace=mock.sentinel.metadata.namespace,
+                    field_selector="involvedObject.name={}".format(
+                        mock.sentinel.metadata.name
+                    ),
+                ),
+                mock.call(
+                    namespace=mock.sentinel.metadata.namespace,
+                    field_selector="involvedObject.name={}".format(
+                        mock.sentinel.metadata.name
+                    ),
+                ),
+            ]
+        )
 
     def test_read_pod_events_retries_fails(self):
         mock.sentinel.metadata = mock.MagicMock()
         self.mock_kube_client.list_namespaced_event.side_effect = [
-            BaseHTTPError('Boom'),
-            BaseHTTPError('Boom'),
-            BaseHTTPError('Boom')
+            BaseHTTPError("Boom"),
+            BaseHTTPError("Boom"),
+            BaseHTTPError("Boom"),
         ]
         self.assertRaises(
-            AirflowException,
-            self.pod_launcher.read_pod_events,
-            mock.sentinel
+            AirflowException, self.pod_launcher.read_pod_events, mock.sentinel
         )
 
     def test_read_pod_returns_logs(self):
@@ -146,42 +149,66 @@ class TestPodLauncher(unittest.TestCase):
     def test_read_pod_retries_successfully(self):
         mock.sentinel.metadata = mock.MagicMock()
         self.mock_kube_client.read_namespaced_pod.side_effect = [
-            BaseHTTPError('Boom'),
-            mock.sentinel.pod_info
+            BaseHTTPError("Boom"),
+            mock.sentinel.pod_info,
         ]
         pod_info = self.pod_launcher.read_pod(mock.sentinel)
         self.assertEqual(mock.sentinel.pod_info, pod_info)
-        self.mock_kube_client.read_namespaced_pod.assert_has_calls([
-            mock.call(mock.sentinel.metadata.name, mock.sentinel.metadata.namespace),
-            mock.call(mock.sentinel.metadata.name, mock.sentinel.metadata.namespace)
-        ])
+        self.mock_kube_client.read_namespaced_pod.assert_has_calls(
+            [
+                mock.call(
+                    mock.sentinel.metadata.name, mock.sentinel.metadata.namespace
+                ),
+                mock.call(
+                    mock.sentinel.metadata.name, mock.sentinel.metadata.namespace
+                ),
+            ]
+        )
 
     def test_read_pod_retries_fails(self):
         mock.sentinel.metadata = mock.MagicMock()
         self.mock_kube_client.read_namespaced_pod.side_effect = [
-            BaseHTTPError('Boom'),
-            BaseHTTPError('Boom'),
-            BaseHTTPError('Boom')
+            BaseHTTPError("Boom"),
+            BaseHTTPError("Boom"),
+            BaseHTTPError("Boom"),
         ]
-        self.assertRaises(
-            AirflowException,
-            self.pod_launcher.read_pod,
-            mock.sentinel
-        )
+        self.assertRaises(AirflowException, self.pod_launcher.read_pod, mock.sentinel)
 
 
 class TestPodLauncherHelper(unittest.TestCase):
     def test_convert_to_airflow_pod(self):
         input_pod = k8s.V1Pod(
             metadata=k8s.V1ObjectMeta(
-                name="foo",
-                namespace="bar"
+                name="foo", namespace="bar", annotations={"foo": "bar"}
             ),
             spec=k8s.V1PodSpec(
+                affinity=k8s.V1Affinity(
+                    pod_anti_affinity=k8s.V1PodAntiAffinity(
+                        required_during_scheduling_ignored_during_execution=[
+                            k8s.V1WeightedPodAffinityTerm(
+                                weight=1,
+                                pod_affinity_term=k8s.V1PodAffinityTerm(
+                                    label_selector=k8s.V1LabelSelector(
+                                        match_expressions=[
+                                            k8s.V1LabelSelectorRequirement(
+                                                key="security",
+                                                operator="In",
+                                                values="S1",
+                                            )
+                                        ]
+                                    ),
+                                    topology_key="failure-domain.beta.kubernetes.io/zone",
+                                ),
+                            )
+                        ]
+                    )
+                ),
                 init_containers=[
                     k8s.V1Container(
                         name="init-container",
-                        volume_mounts=[k8s.V1VolumeMount(mount_path="/tmp", name="init-secret")]
+                        volume_mounts=[
+                            k8s.V1VolumeMount(mount_path="/tmp", name="init-secret")
+                        ],
                     )
                 ],
                 containers=[
@@ -194,127 +221,157 @@ class TestPodLauncherHelper(unittest.TestCase):
                                 name="AIRFLOW_SECRET",
                                 value_from=k8s.V1EnvVarSource(
                                     secret_key_ref=k8s.V1SecretKeySelector(
-                                        name="ai",
-                                        key="secret_key"
+                                        name="ai", key="secret_key"
                                     )
-                                ))
+                                ),
+                            )
                         ],
                         ports=[
-                            k8s.V1ContainerPort(
-                                name="myport",
-                                container_port=8080,
-                            )
+                            k8s.V1ContainerPort(name="myport", container_port=8080,)
                         ],
                         volume_mounts=[
                             k8s.V1VolumeMount(
                                 name="myvolume",
                                 mount_path="/tmp/mount",
-                                read_only="True"
+                                read_only="True",
                             ),
                             k8s.V1VolumeMount(
-                                name='airflow-config',
-                                mount_path='/config',
-                                sub_path='airflow.cfg',
-                                read_only=True
+                                name="airflow-config",
+                                mount_path="/config",
+                                sub_path="airflow.cfg",
+                                read_only=True,
                             ),
                             k8s.V1VolumeMount(
                                 name="airflow-secret",
                                 mount_path="/opt/mount",
-                                read_only=True
-                            )]
+                                read_only=True,
+                            ),
+                        ],
                     )
                 ],
-                security_context=k8s.V1PodSecurityContext(
-                    run_as_user=0,
-                    fs_group=0,
-                ),
+                security_context=k8s.V1PodSecurityContext(run_as_user=0, fs_group=0,),
                 image_pull_secrets=[k8s.V1LocalObjectReference("my-secret")],
                 volumes=[
-                    k8s.V1Volume(
-                        name="myvolume"
-                    ),
+                    k8s.V1Volume(name="myvolume"),
                     k8s.V1Volume(
                         name="airflow-config",
-                        config_map=k8s.V1ConfigMap(
-                            data="airflow-data"
-                        )
+                        config_map=k8s.V1ConfigMap(data="airflow-data"),
                     ),
                     k8s.V1Volume(
                         name="airflow-secret",
-                        secret=k8s.V1SecretVolumeSource(
-                            secret_name="secret-name",
-                        )
+                        secret=k8s.V1SecretVolumeSource(secret_name="secret-name",),
                     ),
                     k8s.V1Volume(
                         name="init-secret",
-                        secret=k8s.V1SecretVolumeSource(
-                            secret_name="init-secret",
-                        )
-                    )
-                ]
-            )
+                        secret=k8s.V1SecretVolumeSource(secret_name="init-secret",),
+                    ),
+                ],
+            ),
         )
         result_pod = _convert_to_airflow_pod(input_pod)
 
+        self.assertEqual(type(result_pod.affinity), dict)
+
         expected = Pod(
             name="foo",
             namespace="bar",
+            annotations={"foo": "bar"},
             envs={},
             init_containers=[
-                {'name': 'init-container', 'volumeMounts': [{'mountPath': '/tmp', 'name': 'init-secret'}]}
+                {
+                    "name": "init-container",
+                    "volumeMounts": [{"mountPath": "/tmp", "name": "init-secret"}],
+                }
             ],
             cmds=["foo"],
             image="myimage",
-            ports=[
-                Port(name="myport", container_port=8080)
-            ],
+            ports=[Port(name="myport", container_port=8080)],
             volume_mounts=[
                 VolumeMount(
                     name="myvolume",
                     mount_path="/tmp/mount",
                     sub_path=None,
-                    read_only="True"
+                    read_only="True",
                 ),
                 VolumeMount(
                     name="airflow-config",
                     read_only=True,
                     mount_path="/config",
-                    sub_path="airflow.cfg"
+                    sub_path="airflow.cfg",
                 ),
                 VolumeMount(
                     name="airflow-secret",
                     mount_path="/opt/mount",
                     sub_path=None,
-                    read_only=True
-                )],
+                    read_only=True,
+                ),
+            ],
             image_pull_secrets="my-secret",
+            affinity={
+                "podAntiAffinity": {
+                    "requiredDuringSchedulingIgnoredDuringExecution": [
+                        {
+                            "podAffinityTerm": {
+                                "labelSelector": {
+                                    "matchExpressions": [
+                                        {
+                                            "key": "security",
+                                            "operator": "In",
+                                            "values": "S1",
+                                        }
+                                    ]
+                                },
+                                "topologyKey": "failure-domain.beta.kubernetes.io/zone",
+                            },
+                            "weight": 1,
+                        }
+                    ]
+                }
+            },
             secrets=[Secret("env", "AIRFLOW_SECRET", "ai", "secret_key")],
-            security_context={'fsGroup': 0, 'runAsUser': 0},
-            volumes=[Volume(name="myvolume", configs={'name': 'myvolume'}),
-                     Volume(name="airflow-config", configs={'configMap': {'data': 'airflow-data'},
-                                                            'name': 'airflow-config'}),
-                     Volume(name='airflow-secret', configs={'name': 'airflow-secret',
-                                                            'secret': {'secretName': 'secret-name'}}),
-                     Volume(name='init-secret', configs={'name': 'init-secret', 'secret':
-                            {'secretName': 'init-secret'}})],
+            security_context={"fsGroup": 0, "runAsUser": 0},
+            volumes=[
+                Volume(name="myvolume", configs={"name": "myvolume"}),
+                Volume(
+                    name="airflow-config",
+                    configs={
+                        "configMap": {"data": "airflow-data"},
+                        "name": "airflow-config",
+                    },
+                ),
+                Volume(
+                    name="airflow-secret",
+                    configs={
+                        "name": "airflow-secret",
+                        "secret": {"secretName": "secret-name"},
+                    },
+                ),
+                Volume(
+                    name="init-secret",
+                    configs={
+                        "name": "init-secret",
+                        "secret": {"secretName": "init-secret"},
+                    },
+                ),
+            ],
         )
         expected_dict = expected.as_dict()
         result_dict = result_pod.as_dict()
         print(result_pod.volume_mounts)
         parsed_configs = self.pull_out_volumes(result_dict)
-        result_dict['volumes'] = parsed_configs
-        self.assertEqual(result_dict['secrets'], expected_dict['secrets'])
+        result_dict["volumes"] = parsed_configs
+        self.assertEqual(result_dict["secrets"], expected_dict["secrets"])
         self.assertDictEqual(expected_dict, result_dict)
 
     @staticmethod
     def pull_out_volumes(result_dict):
         parsed_configs = []
-        for volume in result_dict['volumes']:
-            vol = {'name': volume['name']}
+        for volume in result_dict["volumes"]:
+            vol = {"name": volume["name"]}
             confs = {}
-            for k, v in volume['configs'].items():
-                if v and k[0] != '_':
+            for k, v in volume["configs"].items():
+                if v and k[0] != "_":
                     confs[k] = v
-            vol['configs'] = confs
+            vol["configs"] = confs
             parsed_configs.append(vol)
         return parsed_configs