You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/06/29 13:21:31 UTC
[airflow] 25/37: [AIRFLOW-6778] Add a configurable DAGs volume
mount path for Kubernetes (#8147)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 33992dc693f51d0dd7571d9b377bfd944181ab90
Author: Brandon T. Willard <97...@users.noreply.github.com>
AuthorDate: Tue Apr 7 15:35:09 2020 -0500
[AIRFLOW-6778] Add a configurable DAGs volume mount path for Kubernetes (#8147)
(cherry picked from commit 75896c30cf37002585e3b17efa002da279090f76)
---
airflow/config_templates/config.yml | 7 +++++++
airflow/config_templates/default_airflow.cfg | 3 +++
airflow/executors/kubernetes_executor.py | 2 ++
airflow/kubernetes/worker_configuration.py | 5 +++++
tests/kubernetes/test_worker_configuration.py | 6 ++++++
5 files changed, 23 insertions(+)
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index a7fa7a6..9b63200 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1827,6 +1827,13 @@
type: string
example: ~
default: ""
+ - name: dags_volume_mount_point
+ description: |
+ For either git sync or volume mounted DAGs, the worker will mount the volume in this path
+ version_added: ~
+ type: string
+ example: ~
+ default: ""
- name: dags_volume_claim
description: |
For DAGs mounted via a volume claim (mutually exclusive with git-sync and host path)
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 27fe92a..ca9de12 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -853,6 +853,9 @@ dags_in_image = False
# For either git sync or volume mounted DAGs, the worker will look in this subpath for DAGs
dags_volume_subpath =
+# For either git sync or volume mounted DAGs, the worker will mount the volume in this path
+dags_volume_mount_point =
+
# For DAGs mounted via a volume claim (mutually exclusive with git-sync and host path)
dags_volume_claim =
diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index 6ec2660..98e3154 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -125,6 +125,8 @@ class KubeConfig:
# DAGs directly
self.dags_volume_claim = conf.get(self.kubernetes_section, 'dags_volume_claim')
+ self.dags_volume_mount_point = conf.get(self.kubernetes_section, 'dags_volume_mount_point')
+
# This prop may optionally be set for PV Claims and is used to write logs
self.logs_volume_claim = conf.get(self.kubernetes_section, 'logs_volume_claim')
diff --git a/airflow/kubernetes/worker_configuration.py b/airflow/kubernetes/worker_configuration.py
index 820763b..9c35910 100644
--- a/airflow/kubernetes/worker_configuration.py
+++ b/airflow/kubernetes/worker_configuration.py
@@ -412,6 +412,11 @@ class WorkerConfiguration(LoggingMixin):
return list(volumes.values())
def generate_dag_volume_mount_path(self):
+ """Generate path for DAG volume"""
+
+ if self.kube_config.dags_volume_mount_point:
+ return self.kube_config.dags_volume_mount_point
+
if self.kube_config.dags_volume_claim or self.kube_config.dags_volume_host:
return self.worker_airflow_dags
diff --git a/tests/kubernetes/test_worker_configuration.py b/tests/kubernetes/test_worker_configuration.py
index 73b3f20..0730595 100644
--- a/tests/kubernetes/test_worker_configuration.py
+++ b/tests/kubernetes/test_worker_configuration.py
@@ -88,6 +88,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
self.kube_config.airflow_dags = 'dags'
self.kube_config.airflow_logs = 'logs'
self.kube_config.dags_volume_subpath = None
+ self.kube_config.dags_volume_mount_point = None
self.kube_config.logs_volume_subpath = None
self.kube_config.dags_in_image = False
self.kube_config.dags_folder = None
@@ -145,6 +146,11 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
dag_volume_mount_path = worker_config.generate_dag_volume_mount_path()
self.assertEqual(dag_volume_mount_path, self.kube_config.dags_folder)
+ self.kube_config.dags_volume_mount_point = '/root/airflow/package'
+ dag_volume_mount_path = worker_config.generate_dag_volume_mount_path()
+ self.assertEqual(dag_volume_mount_path, '/root/airflow/package')
+ self.kube_config.dags_volume_mount_point = ''
+
self.kube_config.dags_volume_claim = ''
self.kube_config.dags_volume_host = '/host/airflow/dags'
dag_volume_mount_path = worker_config.generate_dag_volume_mount_path()