You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by di...@apache.org on 2020/08/18 15:31:21 UTC

[airflow] branch v1-10-test updated: Fix issue with mounting volumes from secrets (#10366)

This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-test by this push:
     new c44fddf  Fix issue with mounting volumes from secrets (#10366)
c44fddf is described below

commit c44fddfafb5b9c58b40c8c4987102a049c26f1b9
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Tue Aug 18 16:30:41 2020 +0100

    Fix issue with mounting volumes from secrets (#10366)
    
    * Fix issue with mounting volumes from secrets
    
    Previously, we were using the "name" variable to merge or extend
    items, however this was problematic in volumes generated by secrets
    as the generated names would not collide leading to duplicate values
    
    (cherry picked from commit 635b04e9331f46cfb8b1810485681e8dc4581f38)
    
    * fix tests
    
    * Simplified workflow. Secrets will no longer be converted to secret objects
    
    Co-authored-by: Daniel Imberman <da...@gmail.com>
---
 airflow/contrib/kubernetes/pod.py                | 25 +++--------------
 airflow/kubernetes/pod_generator.py              | 18 ++++++++-----
 airflow/kubernetes/pod_launcher.py               |  5 ++--
 airflow/kubernetes/worker_configuration.py       |  1 -
 kubernetes_tests/test_kubernetes_pod_operator.py | 34 ++++++++++++++++++++++++
 scripts/ci/kubernetes/secrets.yaml               | 25 +++++++++++++++++
 scripts/ci/libraries/_kind.sh                    |  1 +
 tests/kubernetes/test_pod_generator.py           |  3 ---
 tests/kubernetes/test_pod_launcher.py            | 17 +++++++-----
 tests/kubernetes/test_worker_configuration.py    |  9 +++++++
 tests/test_local_settings/test_local_settings.py | 16 +++++------
 11 files changed, 103 insertions(+), 51 deletions(-)

diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py
index 944cd8c..81e4149 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -175,7 +175,7 @@ class Pod(object):
         )
         for port in _extract_ports(self.ports):
             pod = port.attach_to_pod(pod)
-        volumes, _ = _extract_volumes_and_secrets(self.volumes, self.volume_mounts)
+        volumes = _extract_volumes(self.volumes)
         for volume in volumes:
             pod = volume.attach_to_pod(pod)
         for volume_mount in _extract_volume_mounts(self.volume_mounts):
@@ -279,37 +279,18 @@ def _extract_volume_mounts(volume_mounts):
                 sub_path=volume_mount.get("subPath"),
                 read_only=volume_mount.get("readOnly")
             )
-
         result.append(volume_mount)
     return result
 
 
-def _extract_volumes_and_secrets(volumes, volume_mounts):
+def _extract_volumes(volumes):
     result = []
     volumes = volumes or []  # type: List[Union[k8s.V1Volume, dict]]
-    secrets = []
-    volume_mount_dict = {
-        volume_mount.name: volume_mount
-        for volume_mount in _extract_volume_mounts(volume_mounts)
-    }
     for volume in volumes:
         if isinstance(volume, k8s.V1Volume):
-            secret = _extract_volume_secret(volume, volume_mount_dict.get(volume.name, None))
-            if secret:
-                secrets.append(secret)
-                continue
             volume = api_client.sanitize_for_serialization(volume)
             volume = Volume(name=volume.get("name"), configs=volume)
         if not isinstance(volume, Volume):
             volume = Volume(name=volume.get("name"), configs=volume)
         result.append(volume)
-    return result, secrets
-
-
-def _extract_volume_secret(volume, volume_mount):
-    if not volume.secret:
-        return None
-    if volume_mount:
-        return Secret("volume", volume_mount.mount_path, volume.name, volume.secret.secret_name)
-    else:
-        return Secret("volume", None, volume.name, volume.secret.secret_name)
+    return result
diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index 090e2b1..2c7145d 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -481,7 +481,11 @@ class PodGenerator(object):
 
         client_container = client_containers[0]
         base_container = base_containers[0]
-        client_container = extend_object_field(base_container, client_container, 'volume_mounts')
+        client_container = extend_object_field(
+            base_container,
+            client_container,
+            'volume_mounts',
+            'mount_path')
         client_container = extend_object_field(base_container, client_container, 'env')
         client_container = extend_object_field(base_container, client_container, 'env_from')
         client_container = extend_object_field(base_container, client_container, 'ports')
@@ -623,7 +627,7 @@ def merge_objects(base_obj, client_obj):
     return client_obj_cp
 
 
-def extend_object_field(base_obj, client_obj, field_name):
+def extend_object_field(base_obj, client_obj, field_name, field_to_merge="name"):
     """
     :param base_obj: an object which has a property `field_name` that is a list
     :param client_obj: an object which has a property `field_name` that is a list.
@@ -646,8 +650,8 @@ def extend_object_field(base_obj, client_obj, field_name):
         setattr(client_obj_cp, field_name, base_obj_field)
         return client_obj_cp
 
-    base_obj_set = _get_dict_from_list(base_obj_field)
-    client_obj_set = _get_dict_from_list(client_obj_field)
+    base_obj_set = _get_dict_from_list(base_obj_field, field_to_merge)
+    client_obj_set = _get_dict_from_list(client_obj_field, field_to_merge)
 
     appended_fields = _merge_list_of_objects(base_obj_set, client_obj_set)
 
@@ -666,16 +670,16 @@ def _merge_list_of_objects(base_obj_set, client_obj_set):
     return appended_fields
 
 
-def _get_dict_from_list(base_list):
+def _get_dict_from_list(base_list, field_to_merge="name"):
     """
     :type base_list: list(Optional[dict, *to_dict])
     """
     result = {}
     for obj in base_list:
         if isinstance(obj, dict):
-            result[obj['name']] = obj
+            result[obj[field_to_merge]] = obj
         elif hasattr(obj, "to_dict"):
-            result[obj.name] = obj
+            result[getattr(obj, field_to_merge)] = obj
         else:
             raise AirflowConfigException("Trying to merge invalid object {}".format(obj))
     return result
diff --git a/airflow/kubernetes/pod_launcher.py b/airflow/kubernetes/pod_launcher.py
index 875a24c..e8d4d22 100644
--- a/airflow/kubernetes/pod_launcher.py
+++ b/airflow/kubernetes/pod_launcher.py
@@ -31,7 +31,7 @@ from requests.exceptions import BaseHTTPError
 from airflow import AirflowException
 from airflow import settings
 from airflow.contrib.kubernetes.pod import (
-    Pod, _extract_env_vars_and_secrets, _extract_volumes_and_secrets, _extract_volume_mounts,
+    Pod, _extract_env_vars_and_secrets, _extract_volumes, _extract_volume_mounts,
     _extract_ports, _extract_security_context
 )
 from airflow.kubernetes.kube_client import get_kube_client
@@ -300,8 +300,7 @@ def _convert_to_airflow_pod(pod):
     """
     base_container = pod.spec.containers[0]  # type: k8s.V1Container
     env_vars, secrets = _extract_env_vars_and_secrets(base_container.env)
-    volumes, vol_secrets = _extract_volumes_and_secrets(pod.spec.volumes, base_container.volume_mounts)
-    secrets.extend(vol_secrets)
+    volumes = _extract_volumes(pod.spec.volumes)
     api_client = ApiClient()
     init_containers = pod.spec.init_containers
     if pod.spec.init_containers is not None:
diff --git a/airflow/kubernetes/worker_configuration.py b/airflow/kubernetes/worker_configuration.py
index 4eae2ef..5214d4c 100644
--- a/airflow/kubernetes/worker_configuration.py
+++ b/airflow/kubernetes/worker_configuration.py
@@ -435,7 +435,6 @@ class WorkerConfiguration(LoggingMixin):
         """Creates POD."""
         if self.kube_config.pod_template_file:
             return PodGenerator(pod_template_file=self.kube_config.pod_template_file).gen_pod()
-
         pod = PodGenerator(
             image=self.kube_config.kube_image,
             image_pull_policy=self.kube_config.kube_image_pull_policy or 'IfNotPresent',
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py
index 50a1258..89572f9 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -284,6 +284,40 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
         self.assertEqual(self.expected_pod['spec'], actual_pod['spec'])
         self.assertEqual(self.expected_pod['metadata']['labels'], actual_pod['metadata']['labels'])
 
+    def test_pod_with_volume_secret(self):
+        k = KubernetesPodOperator(
+            namespace='default',
+            image="ubuntu:16.04",
+            cmds=["bash", "-cx"],
+            in_cluster=False,
+            labels={"foo": "bar"},
+            arguments=["echo 10"],
+            secrets=[Secret(
+                deploy_type="volume",
+                deploy_target="/var/location",
+                secret="my-secret",
+                key="content.json",
+            )],
+            name="airflow-test-pod",
+            task_id="task",
+            get_logs=True,
+            is_delete_operator_pod=True,
+        )
+
+        context = self.create_context(k)
+        k.execute(context)
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        self.expected_pod['spec']['containers'][0]['volumeMounts'] = [
+            {'mountPath': '/var/location',
+             'name': mock.ANY,
+             'readOnly': True}]
+        self.expected_pod['spec']['volumes'] = [
+            {'name': mock.ANY,
+             'secret': {'secretName': 'my-secret'}}
+        ]
+        self.assertEqual(self.expected_pod['spec'], actual_pod['spec'])
+        self.assertEqual(self.expected_pod['metadata']['labels'], actual_pod['metadata']['labels'])
+
     def test_pod_hostnetwork(self):
         k = KubernetesPodOperator(
             namespace='default',
diff --git a/scripts/ci/kubernetes/secrets.yaml b/scripts/ci/kubernetes/secrets.yaml
new file mode 100644
index 0000000..de7496a
--- /dev/null
+++ b/scripts/ci/kubernetes/secrets.yaml
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+apiVersion: v1
+kind: Secret
+metadata:
+  name: my-secret
+type: Opaque
+data:
+  username: YWlyZmxvdw==
+  password: YWlyZmxvdw==
diff --git a/scripts/ci/libraries/_kind.sh b/scripts/ci/libraries/_kind.sh
index 173af6d..889212f 100644
--- a/scripts/ci/libraries/_kind.sh
+++ b/scripts/ci/libraries/_kind.sh
@@ -314,6 +314,7 @@ function deploy_test_kubernetes_resources() {
     echo "Deploying Custom kubernetes resources"
     echo
     verbose_kubectl apply -f "scripts/ci/kubernetes/volumes.yaml" --namespace default
+    verbose_kubectl apply -f "scripts/ci/kubernetes/secrets.yaml" --namespace default
 }
 
 
diff --git a/tests/kubernetes/test_pod_generator.py b/tests/kubernetes/test_pod_generator.py
index bb714d4..43598ce 100644
--- a/tests/kubernetes/test_pod_generator.py
+++ b/tests/kubernetes/test_pod_generator.py
@@ -607,9 +607,6 @@ class TestPodGenerator(unittest.TestCase):
                     }],
                     'volumeMounts': [{
                         'mountPath': '/foo/',
-                        'name': 'example-kubernetes-test-volume1'
-                    }, {
-                        'mountPath': '/foo/',
                         'name': 'example-kubernetes-test-volume2'
                     }]
                 }],
diff --git a/tests/kubernetes/test_pod_launcher.py b/tests/kubernetes/test_pod_launcher.py
index 64c24c6..f56615f 100644
--- a/tests/kubernetes/test_pod_launcher.py
+++ b/tests/kubernetes/test_pod_launcher.py
@@ -242,13 +242,12 @@ class TestPodLauncherHelper(unittest.TestCase):
                         name="airflow-secret",
                         secret=k8s.V1SecretVolumeSource(
                             secret_name="secret-name",
-
                         )
                     ),
                     k8s.V1Volume(
                         name="init-secret",
                         secret=k8s.V1SecretVolumeSource(
-                            secret_name="secret-name",
+                            secret_name="init-secret",
                         )
                     )
                 ]
@@ -283,22 +282,26 @@ class TestPodLauncherHelper(unittest.TestCase):
                 ),
                 VolumeMount(
                     name="airflow-secret",
-                    read_only=True,
                     mount_path="/opt/mount",
                     sub_path=None,
+                    read_only=True
                 )],
-            secrets=[Secret("env", "AIRFLOW_SECRET", "ai", "secret_key"),
-                     Secret('volume', '/opt/mount', 'airflow-secret', "secret-name"),
-                     Secret('volume', None, 'init-secret', 'secret-name')],
+            secrets=[Secret("env", "AIRFLOW_SECRET", "ai", "secret_key")],
             security_context={'fsGroup': 0, 'runAsUser': 0},
             volumes=[Volume(name="myvolume", configs={'name': 'myvolume'}),
                      Volume(name="airflow-config", configs={'configMap': {'data': 'airflow-data'},
-                                                            'name': 'airflow-config'})]
+                                                            'name': 'airflow-config'}),
+                     Volume(name='airflow-secret', configs={'name': 'airflow-secret',
+                                                            'secret': {'secretName': 'secret-name'}}),
+                     Volume(name='init-secret', configs={'name': 'init-secret', 'secret':
+                            {'secretName': 'init-secret'}})],
         )
         expected_dict = expected.as_dict()
         result_dict = result_pod.as_dict()
+        print(result_pod.volume_mounts)
         parsed_configs = self.pull_out_volumes(result_dict)
         result_dict['volumes'] = parsed_configs
+        self.assertEqual(result_dict['secrets'], expected_dict['secrets'])
         self.assertDictEqual(expected_dict, result_dict)
 
     @staticmethod
diff --git a/tests/kubernetes/test_worker_configuration.py b/tests/kubernetes/test_worker_configuration.py
index 48dc3e3..59c9326 100644
--- a/tests/kubernetes/test_worker_configuration.py
+++ b/tests/kubernetes/test_worker_configuration.py
@@ -355,6 +355,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
         self.kube_config.worker_run_as_user = 0
         self.kube_config.dags_volume_claim = None
         self.kube_config.dags_volume_host = None
+        self.kube_config.base_log_folder = '/logs'
         self.kube_config.dags_in_image = None
         self.kube_config.worker_fs_group = None
         self.kube_config.git_dags_folder_mount_point = 'dags'
@@ -369,6 +370,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
     def test_make_pod_assert_labels(self):
         # Tests the pod created has all the expected labels set
         self.kube_config.dags_folder = 'dags'
+        self.kube_config.base_log_folder = '/logs'
 
         worker_config = WorkerConfiguration(self.kube_config)
         pod = PodGenerator.construct_pod(
@@ -402,6 +404,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
         self.kube_config.dags_volume_host = None
         self.kube_config.dags_in_image = None
         self.kube_config.worker_fs_group = None
+        self.kube_config.base_log_folder = '/logs'
         self.kube_config.git_dags_folder_mount_point = 'dags'
         self.kube_config.git_sync_dest = 'repo'
         self.kube_config.git_subpath = 'path'
@@ -431,6 +434,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
         self.kube_config.dags_volume_host = None
         self.kube_config.dags_in_image = None
         self.kube_config.worker_fs_group = None
+        self.kube_config.base_log_folder = '/logs'
         self.kube_config.git_dags_folder_mount_point = 'dags'
         self.kube_config.git_sync_dest = 'repo'
         self.kube_config.git_subpath = 'path'
@@ -469,6 +473,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
         self.kube_config.dags_volume_host = None
         self.kube_config.dags_in_image = None
         self.kube_config.worker_fs_group = None
+        self.kube_config.base_log_folder = '/logs'
         self.kube_config.git_dags_folder_mount_point = 'dags'
         self.kube_config.git_sync_dest = 'repo'
         self.kube_config.git_subpath = 'path'
@@ -515,6 +520,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
         self.kube_config.kube_tolerations = self.tolerations_config
         self.kube_config.kube_annotations = self.worker_annotations_config
         self.kube_config.dags_folder = 'dags'
+        self.kube_config.base_log_folder = '/logs'
         worker_config = WorkerConfiguration(self.kube_config)
         pod = worker_config.as_pod()
 
@@ -533,6 +539,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
 
     def test_make_pod_with_executor_config(self):
         self.kube_config.dags_folder = 'dags'
+        self.kube_config.base_log_folder = '/logs'
         worker_config = WorkerConfiguration(self.kube_config)
         config_pod = PodGenerator(
             image='',
@@ -722,6 +729,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
         configmap than airflow_configmap (airflow.cfg)
         """
         self.kube_config.airflow_home = '/usr/local/airflow'
+        self.kube_config.base_log_folder = '/logs'
         self.kube_config.airflow_configmap = 'airflow-configmap'
         self.kube_config.airflow_local_settings_configmap = 'airflow-ls-configmap'
         self.kube_config.dags_folder = '/workers/path/to/dags'
@@ -792,6 +800,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
         self.kube_config.dags_volume_host = None
         self.kube_config.dags_in_image = None
         self.kube_config.git_dags_folder_mount_point = 'dags'
+        self.kube_config.base_log_folder = '/logs'
         self.kube_config.git_sync_dest = 'repo'
         self.kube_config.git_subpath = 'path'
         self.kube_config.image_pull_secrets = 'image_pull_secret1,image_pull_secret2'
diff --git a/tests/test_local_settings/test_local_settings.py b/tests/test_local_settings/test_local_settings.py
index 7c4abf1..9fab355 100644
--- a/tests/test_local_settings/test_local_settings.py
+++ b/tests/test_local_settings/test_local_settings.py
@@ -338,14 +338,14 @@ class LocalSettingsTest(unittest.TestCase):
                                           'resources': {'limits': {'nvidia.com/gpu': '200G'},
                                                         'requests': {'cpu': '200Mi',
                                                                      'memory': '2G'}},
-                                          'volumeMounts': [{'mountPath': '/opt/airflow/secrets/',
-                                                            'name': 'airflow-secrets-mount',
-                                                            'readOnly': True},
-                                                           {'mountPath': '/mnt',
-                                                            'name': 'foo',
-                                                            'readOnly': True,
-                                                            'subPath': '/'}
-                                                           ]}],
+                                          'volumeMounts': [
+                                              {'mountPath': '/mnt',
+                                               'name': 'foo',
+                                               'readOnly': True,
+                                               'subPath': '/'},
+                                              {'mountPath': '/opt/airflow/secrets/',
+                                               'name': 'airflow-secrets-mount',
+                                               'readOnly': True}]}],
                           'hostNetwork': False,
                           'imagePullSecrets': [],
                           'initContainers': [{'name': 'init-container',