You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by di...@apache.org on 2020/08/18 15:31:21 UTC
[airflow] branch v1-10-test updated: Fix issue with mounting
volumes from secrets (#10366)
This is an automated email from the ASF dual-hosted git repository.
dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v1-10-test by this push:
new c44fddf Fix issue with mounting volumes from secrets (#10366)
c44fddf is described below
commit c44fddfafb5b9c58b40c8c4987102a049c26f1b9
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Tue Aug 18 16:30:41 2020 +0100
Fix issue with mounting volumes from secrets (#10366)
* Fix issue with mounting volumes from secrets
Previously, we were using the "name" variable to merge or extend
items, however this was problematic in volumes generated by secrets
as the generated names would not collide leading to duplicate values
(cherry picked from commit 635b04e9331f46cfb8b1810485681e8dc4581f38)
* fix tests
* Simplified workflow. Secrets will no longer be converted to secret objects
Co-authored-by: Daniel Imberman <da...@gmail.com>
---
airflow/contrib/kubernetes/pod.py | 25 +++--------------
airflow/kubernetes/pod_generator.py | 18 ++++++++-----
airflow/kubernetes/pod_launcher.py | 5 ++--
airflow/kubernetes/worker_configuration.py | 1 -
kubernetes_tests/test_kubernetes_pod_operator.py | 34 ++++++++++++++++++++++++
scripts/ci/kubernetes/secrets.yaml | 25 +++++++++++++++++
scripts/ci/libraries/_kind.sh | 1 +
tests/kubernetes/test_pod_generator.py | 3 ---
tests/kubernetes/test_pod_launcher.py | 17 +++++++-----
tests/kubernetes/test_worker_configuration.py | 9 +++++++
tests/test_local_settings/test_local_settings.py | 16 +++++------
11 files changed, 103 insertions(+), 51 deletions(-)
diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py
index 944cd8c..81e4149 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -175,7 +175,7 @@ class Pod(object):
)
for port in _extract_ports(self.ports):
pod = port.attach_to_pod(pod)
- volumes, _ = _extract_volumes_and_secrets(self.volumes, self.volume_mounts)
+ volumes = _extract_volumes(self.volumes)
for volume in volumes:
pod = volume.attach_to_pod(pod)
for volume_mount in _extract_volume_mounts(self.volume_mounts):
@@ -279,37 +279,18 @@ def _extract_volume_mounts(volume_mounts):
sub_path=volume_mount.get("subPath"),
read_only=volume_mount.get("readOnly")
)
-
result.append(volume_mount)
return result
-def _extract_volumes_and_secrets(volumes, volume_mounts):
+def _extract_volumes(volumes):
result = []
volumes = volumes or [] # type: List[Union[k8s.V1Volume, dict]]
- secrets = []
- volume_mount_dict = {
- volume_mount.name: volume_mount
- for volume_mount in _extract_volume_mounts(volume_mounts)
- }
for volume in volumes:
if isinstance(volume, k8s.V1Volume):
- secret = _extract_volume_secret(volume, volume_mount_dict.get(volume.name, None))
- if secret:
- secrets.append(secret)
- continue
volume = api_client.sanitize_for_serialization(volume)
volume = Volume(name=volume.get("name"), configs=volume)
if not isinstance(volume, Volume):
volume = Volume(name=volume.get("name"), configs=volume)
result.append(volume)
- return result, secrets
-
-
-def _extract_volume_secret(volume, volume_mount):
- if not volume.secret:
- return None
- if volume_mount:
- return Secret("volume", volume_mount.mount_path, volume.name, volume.secret.secret_name)
- else:
- return Secret("volume", None, volume.name, volume.secret.secret_name)
+ return result
diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index 090e2b1..2c7145d 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -481,7 +481,11 @@ class PodGenerator(object):
client_container = client_containers[0]
base_container = base_containers[0]
- client_container = extend_object_field(base_container, client_container, 'volume_mounts')
+ client_container = extend_object_field(
+ base_container,
+ client_container,
+ 'volume_mounts',
+ 'mount_path')
client_container = extend_object_field(base_container, client_container, 'env')
client_container = extend_object_field(base_container, client_container, 'env_from')
client_container = extend_object_field(base_container, client_container, 'ports')
@@ -623,7 +627,7 @@ def merge_objects(base_obj, client_obj):
return client_obj_cp
-def extend_object_field(base_obj, client_obj, field_name):
+def extend_object_field(base_obj, client_obj, field_name, field_to_merge="name"):
"""
:param base_obj: an object which has a property `field_name` that is a list
:param client_obj: an object which has a property `field_name` that is a list.
@@ -646,8 +650,8 @@ def extend_object_field(base_obj, client_obj, field_name):
setattr(client_obj_cp, field_name, base_obj_field)
return client_obj_cp
- base_obj_set = _get_dict_from_list(base_obj_field)
- client_obj_set = _get_dict_from_list(client_obj_field)
+ base_obj_set = _get_dict_from_list(base_obj_field, field_to_merge)
+ client_obj_set = _get_dict_from_list(client_obj_field, field_to_merge)
appended_fields = _merge_list_of_objects(base_obj_set, client_obj_set)
@@ -666,16 +670,16 @@ def _merge_list_of_objects(base_obj_set, client_obj_set):
return appended_fields
-def _get_dict_from_list(base_list):
+def _get_dict_from_list(base_list, field_to_merge="name"):
"""
:type base_list: list(Optional[dict, *to_dict])
"""
result = {}
for obj in base_list:
if isinstance(obj, dict):
- result[obj['name']] = obj
+ result[obj[field_to_merge]] = obj
elif hasattr(obj, "to_dict"):
- result[obj.name] = obj
+ result[getattr(obj, field_to_merge)] = obj
else:
raise AirflowConfigException("Trying to merge invalid object {}".format(obj))
return result
diff --git a/airflow/kubernetes/pod_launcher.py b/airflow/kubernetes/pod_launcher.py
index 875a24c..e8d4d22 100644
--- a/airflow/kubernetes/pod_launcher.py
+++ b/airflow/kubernetes/pod_launcher.py
@@ -31,7 +31,7 @@ from requests.exceptions import BaseHTTPError
from airflow import AirflowException
from airflow import settings
from airflow.contrib.kubernetes.pod import (
- Pod, _extract_env_vars_and_secrets, _extract_volumes_and_secrets, _extract_volume_mounts,
+ Pod, _extract_env_vars_and_secrets, _extract_volumes, _extract_volume_mounts,
_extract_ports, _extract_security_context
)
from airflow.kubernetes.kube_client import get_kube_client
@@ -300,8 +300,7 @@ def _convert_to_airflow_pod(pod):
"""
base_container = pod.spec.containers[0] # type: k8s.V1Container
env_vars, secrets = _extract_env_vars_and_secrets(base_container.env)
- volumes, vol_secrets = _extract_volumes_and_secrets(pod.spec.volumes, base_container.volume_mounts)
- secrets.extend(vol_secrets)
+ volumes = _extract_volumes(pod.spec.volumes)
api_client = ApiClient()
init_containers = pod.spec.init_containers
if pod.spec.init_containers is not None:
diff --git a/airflow/kubernetes/worker_configuration.py b/airflow/kubernetes/worker_configuration.py
index 4eae2ef..5214d4c 100644
--- a/airflow/kubernetes/worker_configuration.py
+++ b/airflow/kubernetes/worker_configuration.py
@@ -435,7 +435,6 @@ class WorkerConfiguration(LoggingMixin):
"""Creates POD."""
if self.kube_config.pod_template_file:
return PodGenerator(pod_template_file=self.kube_config.pod_template_file).gen_pod()
-
pod = PodGenerator(
image=self.kube_config.kube_image,
image_pull_policy=self.kube_config.kube_image_pull_policy or 'IfNotPresent',
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py
index 50a1258..89572f9 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -284,6 +284,40 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
self.assertEqual(self.expected_pod['spec'], actual_pod['spec'])
self.assertEqual(self.expected_pod['metadata']['labels'], actual_pod['metadata']['labels'])
+ def test_pod_with_volume_secret(self):
+ k = KubernetesPodOperator(
+ namespace='default',
+ image="ubuntu:16.04",
+ cmds=["bash", "-cx"],
+ in_cluster=False,
+ labels={"foo": "bar"},
+ arguments=["echo 10"],
+ secrets=[Secret(
+ deploy_type="volume",
+ deploy_target="/var/location",
+ secret="my-secret",
+ key="content.json",
+ )],
+ name="airflow-test-pod",
+ task_id="task",
+ get_logs=True,
+ is_delete_operator_pod=True,
+ )
+
+ context = self.create_context(k)
+ k.execute(context)
+ actual_pod = self.api_client.sanitize_for_serialization(k.pod)
+ self.expected_pod['spec']['containers'][0]['volumeMounts'] = [
+ {'mountPath': '/var/location',
+ 'name': mock.ANY,
+ 'readOnly': True}]
+ self.expected_pod['spec']['volumes'] = [
+ {'name': mock.ANY,
+ 'secret': {'secretName': 'my-secret'}}
+ ]
+ self.assertEqual(self.expected_pod['spec'], actual_pod['spec'])
+ self.assertEqual(self.expected_pod['metadata']['labels'], actual_pod['metadata']['labels'])
+
def test_pod_hostnetwork(self):
k = KubernetesPodOperator(
namespace='default',
diff --git a/scripts/ci/kubernetes/secrets.yaml b/scripts/ci/kubernetes/secrets.yaml
new file mode 100644
index 0000000..de7496a
--- /dev/null
+++ b/scripts/ci/kubernetes/secrets.yaml
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+apiVersion: v1
+kind: Secret
+metadata:
+ name: my-secret
+type: Opaque
+data:
+ username: YWlyZmxvdw==
+ password: YWlyZmxvdw==
diff --git a/scripts/ci/libraries/_kind.sh b/scripts/ci/libraries/_kind.sh
index 173af6d..889212f 100644
--- a/scripts/ci/libraries/_kind.sh
+++ b/scripts/ci/libraries/_kind.sh
@@ -314,6 +314,7 @@ function deploy_test_kubernetes_resources() {
echo "Deploying Custom kubernetes resources"
echo
verbose_kubectl apply -f "scripts/ci/kubernetes/volumes.yaml" --namespace default
+ verbose_kubectl apply -f "scripts/ci/kubernetes/secrets.yaml" --namespace default
}
diff --git a/tests/kubernetes/test_pod_generator.py b/tests/kubernetes/test_pod_generator.py
index bb714d4..43598ce 100644
--- a/tests/kubernetes/test_pod_generator.py
+++ b/tests/kubernetes/test_pod_generator.py
@@ -607,9 +607,6 @@ class TestPodGenerator(unittest.TestCase):
}],
'volumeMounts': [{
'mountPath': '/foo/',
- 'name': 'example-kubernetes-test-volume1'
- }, {
- 'mountPath': '/foo/',
'name': 'example-kubernetes-test-volume2'
}]
}],
diff --git a/tests/kubernetes/test_pod_launcher.py b/tests/kubernetes/test_pod_launcher.py
index 64c24c6..f56615f 100644
--- a/tests/kubernetes/test_pod_launcher.py
+++ b/tests/kubernetes/test_pod_launcher.py
@@ -242,13 +242,12 @@ class TestPodLauncherHelper(unittest.TestCase):
name="airflow-secret",
secret=k8s.V1SecretVolumeSource(
secret_name="secret-name",
-
)
),
k8s.V1Volume(
name="init-secret",
secret=k8s.V1SecretVolumeSource(
- secret_name="secret-name",
+ secret_name="init-secret",
)
)
]
@@ -283,22 +282,26 @@ class TestPodLauncherHelper(unittest.TestCase):
),
VolumeMount(
name="airflow-secret",
- read_only=True,
mount_path="/opt/mount",
sub_path=None,
+ read_only=True
)],
- secrets=[Secret("env", "AIRFLOW_SECRET", "ai", "secret_key"),
- Secret('volume', '/opt/mount', 'airflow-secret', "secret-name"),
- Secret('volume', None, 'init-secret', 'secret-name')],
+ secrets=[Secret("env", "AIRFLOW_SECRET", "ai", "secret_key")],
security_context={'fsGroup': 0, 'runAsUser': 0},
volumes=[Volume(name="myvolume", configs={'name': 'myvolume'}),
Volume(name="airflow-config", configs={'configMap': {'data': 'airflow-data'},
- 'name': 'airflow-config'})]
+ 'name': 'airflow-config'}),
+ Volume(name='airflow-secret', configs={'name': 'airflow-secret',
+ 'secret': {'secretName': 'secret-name'}}),
+ Volume(name='init-secret', configs={'name': 'init-secret', 'secret':
+ {'secretName': 'init-secret'}})],
)
expected_dict = expected.as_dict()
result_dict = result_pod.as_dict()
+ print(result_pod.volume_mounts)
parsed_configs = self.pull_out_volumes(result_dict)
result_dict['volumes'] = parsed_configs
+ self.assertEqual(result_dict['secrets'], expected_dict['secrets'])
self.assertDictEqual(expected_dict, result_dict)
@staticmethod
diff --git a/tests/kubernetes/test_worker_configuration.py b/tests/kubernetes/test_worker_configuration.py
index 48dc3e3..59c9326 100644
--- a/tests/kubernetes/test_worker_configuration.py
+++ b/tests/kubernetes/test_worker_configuration.py
@@ -355,6 +355,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
self.kube_config.worker_run_as_user = 0
self.kube_config.dags_volume_claim = None
self.kube_config.dags_volume_host = None
+ self.kube_config.base_log_folder = '/logs'
self.kube_config.dags_in_image = None
self.kube_config.worker_fs_group = None
self.kube_config.git_dags_folder_mount_point = 'dags'
@@ -369,6 +370,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
def test_make_pod_assert_labels(self):
# Tests the pod created has all the expected labels set
self.kube_config.dags_folder = 'dags'
+ self.kube_config.base_log_folder = '/logs'
worker_config = WorkerConfiguration(self.kube_config)
pod = PodGenerator.construct_pod(
@@ -402,6 +404,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
self.kube_config.dags_volume_host = None
self.kube_config.dags_in_image = None
self.kube_config.worker_fs_group = None
+ self.kube_config.base_log_folder = '/logs'
self.kube_config.git_dags_folder_mount_point = 'dags'
self.kube_config.git_sync_dest = 'repo'
self.kube_config.git_subpath = 'path'
@@ -431,6 +434,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
self.kube_config.dags_volume_host = None
self.kube_config.dags_in_image = None
self.kube_config.worker_fs_group = None
+ self.kube_config.base_log_folder = '/logs'
self.kube_config.git_dags_folder_mount_point = 'dags'
self.kube_config.git_sync_dest = 'repo'
self.kube_config.git_subpath = 'path'
@@ -469,6 +473,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
self.kube_config.dags_volume_host = None
self.kube_config.dags_in_image = None
self.kube_config.worker_fs_group = None
+ self.kube_config.base_log_folder = '/logs'
self.kube_config.git_dags_folder_mount_point = 'dags'
self.kube_config.git_sync_dest = 'repo'
self.kube_config.git_subpath = 'path'
@@ -515,6 +520,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
self.kube_config.kube_tolerations = self.tolerations_config
self.kube_config.kube_annotations = self.worker_annotations_config
self.kube_config.dags_folder = 'dags'
+ self.kube_config.base_log_folder = '/logs'
worker_config = WorkerConfiguration(self.kube_config)
pod = worker_config.as_pod()
@@ -533,6 +539,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
def test_make_pod_with_executor_config(self):
self.kube_config.dags_folder = 'dags'
+ self.kube_config.base_log_folder = '/logs'
worker_config = WorkerConfiguration(self.kube_config)
config_pod = PodGenerator(
image='',
@@ -722,6 +729,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
configmap than airflow_configmap (airflow.cfg)
"""
self.kube_config.airflow_home = '/usr/local/airflow'
+ self.kube_config.base_log_folder = '/logs'
self.kube_config.airflow_configmap = 'airflow-configmap'
self.kube_config.airflow_local_settings_configmap = 'airflow-ls-configmap'
self.kube_config.dags_folder = '/workers/path/to/dags'
@@ -792,6 +800,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
self.kube_config.dags_volume_host = None
self.kube_config.dags_in_image = None
self.kube_config.git_dags_folder_mount_point = 'dags'
+ self.kube_config.base_log_folder = '/logs'
self.kube_config.git_sync_dest = 'repo'
self.kube_config.git_subpath = 'path'
self.kube_config.image_pull_secrets = 'image_pull_secret1,image_pull_secret2'
diff --git a/tests/test_local_settings/test_local_settings.py b/tests/test_local_settings/test_local_settings.py
index 7c4abf1..9fab355 100644
--- a/tests/test_local_settings/test_local_settings.py
+++ b/tests/test_local_settings/test_local_settings.py
@@ -338,14 +338,14 @@ class LocalSettingsTest(unittest.TestCase):
'resources': {'limits': {'nvidia.com/gpu': '200G'},
'requests': {'cpu': '200Mi',
'memory': '2G'}},
- 'volumeMounts': [{'mountPath': '/opt/airflow/secrets/',
- 'name': 'airflow-secrets-mount',
- 'readOnly': True},
- {'mountPath': '/mnt',
- 'name': 'foo',
- 'readOnly': True,
- 'subPath': '/'}
- ]}],
+ 'volumeMounts': [
+ {'mountPath': '/mnt',
+ 'name': 'foo',
+ 'readOnly': True,
+ 'subPath': '/'},
+ {'mountPath': '/opt/airflow/secrets/',
+ 'name': 'airflow-secrets-mount',
+ 'readOnly': True}]}],
'hostNetwork': False,
'imagePullSecrets': [],
'initContainers': [{'name': 'init-container',