You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by di...@apache.org on 2020/08/17 16:38:47 UTC

[airflow] branch v1-10-test updated: Fix issue with mounting volumes from secrets

This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-test by this push:
     new 1ad6ec4  Fix issue with mounting volumes from secrets
1ad6ec4 is described below

commit 1ad6ec43439cb1781176e567d3c7fab813404f1f
Author: Daniel Imberman <da...@gmail.com>
AuthorDate: Mon Aug 17 09:19:57 2020 -0700

    Fix issue with mounting volumes from secrets
    
    Previously, we were using the "name" variable to merge or extend
    items, however this was problematic in volumes generated by secrets
    as the generated names would not collide leading to duplicate values
    
    (cherry picked from commit 635b04e9331f46cfb8b1810485681e8dc4581f38)
---
 airflow/contrib/kubernetes/pod.py                | 14 ++++++----
 airflow/kubernetes/pod_generator.py              | 18 ++++++++-----
 airflow/kubernetes/pod_launcher.py               |  2 +-
 kubernetes_tests/test_kubernetes_pod_operator.py | 34 ++++++++++++++++++++++++
 scripts/ci/kubernetes/secrets.yaml               | 25 +++++++++++++++++
 scripts/ci/libraries/_kind.sh                    |  1 +
 6 files changed, 81 insertions(+), 13 deletions(-)

diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py
index 944cd8c..267b879 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -260,7 +260,11 @@ def _extract_security_context(security_context):
     return security_context
 
 
-def _extract_volume_mounts(volume_mounts):
+def _extract_volume_mounts(volume_mounts, volume_secrets_to_exclude=None):
+    volume_secrets_to_exclude = volume_secrets_to_exclude or []  # type: List[Secret]
+    volume_secret_mount_paths = set(
+        [v.deploy_target for v in volume_secrets_to_exclude if v.deploy_type == "volume"]
+    )
     result = []
     volume_mounts = volume_mounts or []  # type: List[Union[k8s.V1VolumeMount, dict]]
     for volume_mount in volume_mounts:
@@ -279,8 +283,8 @@ def _extract_volume_mounts(volume_mounts):
                 sub_path=volume_mount.get("subPath"),
                 read_only=volume_mount.get("readOnly")
             )
-
-        result.append(volume_mount)
+        if volume_mount.mount_path not in volume_secret_mount_paths:
+            result.append(volume_mount)
     return result
 
 
@@ -310,6 +314,6 @@ def _extract_volume_secret(volume, volume_mount):
     if not volume.secret:
         return None
     if volume_mount:
-        return Secret("volume", volume_mount.mount_path, volume.name, volume.secret.secret_name)
+        return Secret("volume", volume_mount.mount_path, volume.secret.secret_name, volume.secret.key)
     else:
-        return Secret("volume", None, volume.name, volume.secret.secret_name)
+        return Secret("volume", None, volume.secret.secret_name, volume.secret.key)
diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index 090e2b1..2c7145d 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -481,7 +481,11 @@ class PodGenerator(object):
 
         client_container = client_containers[0]
         base_container = base_containers[0]
-        client_container = extend_object_field(base_container, client_container, 'volume_mounts')
+        client_container = extend_object_field(
+            base_container,
+            client_container,
+            'volume_mounts',
+            'mount_path')
         client_container = extend_object_field(base_container, client_container, 'env')
         client_container = extend_object_field(base_container, client_container, 'env_from')
         client_container = extend_object_field(base_container, client_container, 'ports')
@@ -623,7 +627,7 @@ def merge_objects(base_obj, client_obj):
     return client_obj_cp
 
 
-def extend_object_field(base_obj, client_obj, field_name):
+def extend_object_field(base_obj, client_obj, field_name, field_to_merge="name"):
     """
     :param base_obj: an object which has a property `field_name` that is a list
     :param client_obj: an object which has a property `field_name` that is a list.
@@ -646,8 +650,8 @@ def extend_object_field(base_obj, client_obj, field_name):
         setattr(client_obj_cp, field_name, base_obj_field)
         return client_obj_cp
 
-    base_obj_set = _get_dict_from_list(base_obj_field)
-    client_obj_set = _get_dict_from_list(client_obj_field)
+    base_obj_set = _get_dict_from_list(base_obj_field, field_to_merge)
+    client_obj_set = _get_dict_from_list(client_obj_field, field_to_merge)
 
     appended_fields = _merge_list_of_objects(base_obj_set, client_obj_set)
 
@@ -666,16 +670,16 @@ def _merge_list_of_objects(base_obj_set, client_obj_set):
     return appended_fields
 
 
-def _get_dict_from_list(base_list):
+def _get_dict_from_list(base_list, field_to_merge="name"):
     """
     :type base_list: list(Optional[dict, *to_dict])
     """
     result = {}
     for obj in base_list:
         if isinstance(obj, dict):
-            result[obj['name']] = obj
+            result[obj[field_to_merge]] = obj
         elif hasattr(obj, "to_dict"):
-            result[obj.name] = obj
+            result[getattr(obj, field_to_merge)] = obj
         else:
             raise AirflowConfigException("Trying to merge invalid object {}".format(obj))
     return result
diff --git a/airflow/kubernetes/pod_launcher.py b/airflow/kubernetes/pod_launcher.py
index 875a24c..69ab56d 100644
--- a/airflow/kubernetes/pod_launcher.py
+++ b/airflow/kubernetes/pod_launcher.py
@@ -317,7 +317,7 @@ def _convert_to_airflow_pod(pod):
         name=pod.metadata.name,
         ports=_extract_ports(base_container.ports),
         volumes=volumes,
-        volume_mounts=_extract_volume_mounts(base_container.volume_mounts),
+        volume_mounts=_extract_volume_mounts(base_container.volume_mounts, vol_secrets),
         namespace=pod.metadata.namespace,
         image_pull_policy=base_container.image_pull_policy or 'IfNotPresent',
         tolerations=pod.spec.tolerations,
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py
index 50a1258..89572f9 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -284,6 +284,40 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
         self.assertEqual(self.expected_pod['spec'], actual_pod['spec'])
         self.assertEqual(self.expected_pod['metadata']['labels'], actual_pod['metadata']['labels'])
 
+    def test_pod_with_volume_secret(self):
+        k = KubernetesPodOperator(
+            namespace='default',
+            image="ubuntu:16.04",
+            cmds=["bash", "-cx"],
+            in_cluster=False,
+            labels={"foo": "bar"},
+            arguments=["echo 10"],
+            secrets=[Secret(
+                deploy_type="volume",
+                deploy_target="/var/location",
+                secret="my-secret",
+                key="content.json",
+            )],
+            name="airflow-test-pod",
+            task_id="task",
+            get_logs=True,
+            is_delete_operator_pod=True,
+        )
+
+        context = self.create_context(k)
+        k.execute(context)
+        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+        self.expected_pod['spec']['containers'][0]['volumeMounts'] = [
+            {'mountPath': '/var/location',
+             'name': mock.ANY,
+             'readOnly': True}]
+        self.expected_pod['spec']['volumes'] = [
+            {'name': mock.ANY,
+             'secret': {'secretName': 'my-secret'}}
+        ]
+        self.assertEqual(self.expected_pod['spec'], actual_pod['spec'])
+        self.assertEqual(self.expected_pod['metadata']['labels'], actual_pod['metadata']['labels'])
+
     def test_pod_hostnetwork(self):
         k = KubernetesPodOperator(
             namespace='default',
diff --git a/scripts/ci/kubernetes/secrets.yaml b/scripts/ci/kubernetes/secrets.yaml
new file mode 100644
index 0000000..de7496a
--- /dev/null
+++ b/scripts/ci/kubernetes/secrets.yaml
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+apiVersion: v1
+kind: Secret
+metadata:
+  name: my-secret
+type: Opaque
+data:
+  username: YWlyZmxvdw==
+  password: YWlyZmxvdw==
diff --git a/scripts/ci/libraries/_kind.sh b/scripts/ci/libraries/_kind.sh
index 173af6d..889212f 100644
--- a/scripts/ci/libraries/_kind.sh
+++ b/scripts/ci/libraries/_kind.sh
@@ -314,6 +314,7 @@ function deploy_test_kubernetes_resources() {
     echo "Deploying Custom kubernetes resources"
     echo
     verbose_kubectl apply -f "scripts/ci/kubernetes/volumes.yaml" --namespace default
+    verbose_kubectl apply -f "scripts/ci/kubernetes/secrets.yaml" --namespace default
 }