You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/12 17:43:59 UTC

[GitHub] feng-tao closed pull request #3683: [AIRFLOW-2770] kubernetes: add support for dag folder in the docker i…

feng-tao closed pull request #3683: [AIRFLOW-2770] kubernetes: add support for dag folder in the docker i…
URL: https://github.com/apache/incubator-airflow/pull/3683
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 828db54a64..35a8809690 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -572,6 +572,10 @@ namespace = default
 # The name of the Kubernetes ConfigMap Containing the Airflow Configuration (this file)
 airflow_configmap =
 
+# For docker image already contains DAGs, this is set to `True`, and the worker will search for dags in dags_folder,
+# otherwise use git sync or dags volumn chaim to mount DAGs
+dags_in_docker = FALSE
+
 # For either git sync or volume mounted DAGs, the worker will look in this subpath for DAGs
 dags_volume_subpath =
 
diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py
index 788d925c38..a1df83582e 100644
--- a/airflow/contrib/executors/kubernetes_executor.py
+++ b/airflow/contrib/executors/kubernetes_executor.py
@@ -128,6 +128,10 @@ def __init__(self):
             self.kubernetes_section, 'worker_service_account_name')
         self.image_pull_secrets = conf.get(self.kubernetes_section, 'image_pull_secrets')
 
+        # NOTE: user can build the dags into the docker image directly,
+        # this will set to True if so
+        self.dags_in_docker = conf.get(self.kubernetes_section, 'dags_in_docker')
+
         # NOTE: `git_repo` and `git_branch` must be specified together as a pair
         # The http URL of the git repository to clone from
         self.git_repo = conf.get(self.kubernetes_section, 'git_repo')
@@ -195,10 +199,12 @@ def __init__(self):
         self._validate()
 
     def _validate(self):
-        if not self.dags_volume_claim and (not self.git_repo or not self.git_branch):
+        if not self.dags_volume_claim and not self.dags_in_docker \
+                and (not self.git_repo or not self.git_branch):
             raise AirflowConfigException(
                 'In kubernetes mode the following must be set in the `kubernetes` '
-                'config section: `dags_volume_claim` or `git_repo and git_branch`')
+                'config section: `dags_volume_claim` or `git_repo and git_branch` '
+                'or `dags_in_docker`')
 
 
 class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object):
diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py
index c9f86b047a..febae8ccb6 100644
--- a/airflow/contrib/kubernetes/worker_configuration.py
+++ b/airflow/contrib/kubernetes/worker_configuration.py
@@ -38,7 +38,7 @@ def __init__(self, kube_config):
     def _get_init_containers(self, volume_mounts):
         """When using git to retrieve the DAGs, use the GitSync Init Container"""
         # If we're using volume claims to mount the dags, no init container is needed
-        if self.kube_config.dags_volume_claim:
+        if self.kube_config.dags_volume_claim or self.kube_config.dags_in_docker:
             return []
 
         # Otherwise, define a git-sync init container
@@ -121,32 +121,19 @@ def _construct_volume(name, claim):
             return volume
 
         volumes = [
-            _construct_volume(
-                dags_volume_name,
-                self.kube_config.dags_volume_claim
-            ),
             _construct_volume(
                 logs_volume_name,
                 self.kube_config.logs_volume_claim
             )
         ]
 
-        dag_volume_mount_path = ""
-
-        if self.kube_config.dags_volume_claim:
-            dag_volume_mount_path = self.worker_airflow_dags
-        else:
-            dag_volume_mount_path = os.path.join(
-                self.worker_airflow_dags,
-                self.kube_config.git_subpath
+        if not self.kube_config.dags_in_docker:
+            volumes.append(
+                _construct_volume(
+                    dags_volume_name,
+                    self.kube_config.dags_volume_claim
+                )
             )
-        dags_volume_mount = {
-            'name': dags_volume_name,
-            'mountPath': dag_volume_mount_path,
-            'readOnly': True,
-        }
-        if self.kube_config.dags_volume_subpath:
-            dags_volume_mount['subPath'] = self.kube_config.dags_volume_subpath
 
         logs_volume_mount = {
             'name': logs_volume_name,
@@ -156,10 +143,28 @@ def _construct_volume(name, claim):
             logs_volume_mount['subPath'] = self.kube_config.logs_volume_subpath
 
         volume_mounts = [
-            dags_volume_mount,
             logs_volume_mount
         ]
 
+        if not self.kube_config.dags_in_docker:
+            dag_volume_mount_path = ""
+
+            if self.kube_config.dags_volume_claim:
+                dag_volume_mount_path = self.worker_airflow_dags
+            else:
+                dag_volume_mount_path = os.path.join(
+                    self.worker_airflow_dags,
+                    self.kube_config.git_subpath
+                )
+            dags_volume_mount = {
+                'name': dags_volume_name,
+                'mountPath': dag_volume_mount_path,
+                'readOnly': True,
+            }
+            if self.kube_config.dags_volume_subpath:
+                dags_volume_mount['subPath'] = self.kube_config.dags_volume_subpath
+            volume_mounts.append(dags_volume_mount)
+
         # Mount the airflow.cfg file via a configmap the user has specified
         if self.kube_config.airflow_configmap:
             config_volume_name = 'airflow-config'
diff --git a/scripts/ci/kubernetes/kube/configmaps.yaml b/scripts/ci/kubernetes/kube/configmaps.yaml
index 97556bf840..c55302bd5c 100644
--- a/scripts/ci/kubernetes/kube/configmaps.yaml
+++ b/scripts/ci/kubernetes/kube/configmaps.yaml
@@ -180,6 +180,7 @@ data:
     worker_container_tag = latest
     worker_container_image_pull_policy = IfNotPresent
     delete_worker_pods = True
+    dags_in_docker = False
     git_repo = https://github.com/apache/incubator-airflow.git
     git_branch = master
     git_subpath = airflow/example_dags/


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services