You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/08/06 20:45:00 UTC

[jira] [Commented] (AIRFLOW-2755) k8s workers think DAGs are always in `/tmp/dags`

    [ https://issues.apache.org/jira/browse/AIRFLOW-2755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16570751#comment-16570751 ] 

ASF GitHub Bot commented on AIRFLOW-2755:
-----------------------------------------

Fokko closed pull request #3612: [AIRFLOW-2755] Added `kubernetes.worker_dags_folder` configuration
URL: https://github.com/apache/incubator-airflow/pull/3612
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index be286ea3dc..d4a7242118 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -562,6 +562,7 @@ elasticsearch_end_of_log_mark = end_of_log
 worker_container_repository =
 worker_container_tag =
 worker_container_image_pull_policy = IfNotPresent
+worker_dags_folder =
 
 # If True (default), worker pods will be deleted upon termination
 delete_worker_pods = True
diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py
index 788d925c38..66c600ba65 100644
--- a/airflow/contrib/executors/kubernetes_executor.py
+++ b/airflow/contrib/executors/kubernetes_executor.py
@@ -115,6 +115,8 @@ def __init__(self):
             self.kubernetes_section, 'worker_container_repository')
         self.worker_container_tag = configuration.get(
             self.kubernetes_section, 'worker_container_tag')
+        self.worker_dags_folder = configuration.get(
+            self.kubernetes_section, 'worker_dags_folder')
         self.kube_image = '{}:{}'.format(
             self.worker_container_repository, self.worker_container_tag)
         self.kube_image_pull_policy = configuration.get(
diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py
index c9f86b047a..88a5cf0a40 100644
--- a/airflow/contrib/kubernetes/worker_configuration.py
+++ b/airflow/contrib/kubernetes/worker_configuration.py
@@ -81,12 +81,14 @@ def _get_init_containers(self, volume_mounts):
     def _get_environment(self):
         """Defines any necessary environment variables for the pod executor"""
         env = {
-            'AIRFLOW__CORE__DAGS_FOLDER': '/tmp/dags',
-            'AIRFLOW__CORE__EXECUTOR': 'LocalExecutor',
-            'AIRFLOW__CORE__SQL_ALCHEMY_CONN': conf.get('core', 'SQL_ALCHEMY_CONN')
+            "AIRFLOW__CORE__EXECUTOR": "LocalExecutor",
+            "AIRFLOW__CORE__SQL_ALCHEMY_CONN": conf.get("core", "SQL_ALCHEMY_CONN"),
         }
+
         if self.kube_config.airflow_configmap:
             env['AIRFLOW__CORE__AIRFLOW_HOME'] = self.worker_airflow_home
+        if self.kube_config.worker_dags_folder:
+            env['AIRFLOW__CORE__DAGS_FOLDER'] = self.kube_config.worker_dags_folder
         return env
 
     def _get_secrets(self):
diff --git a/scripts/ci/kubernetes/kube/configmaps.yaml b/scripts/ci/kubernetes/kube/configmaps.yaml
index 97556bf840..3e64ae4e47 100644
--- a/scripts/ci/kubernetes/kube/configmaps.yaml
+++ b/scripts/ci/kubernetes/kube/configmaps.yaml
@@ -179,6 +179,7 @@ data:
     worker_container_repository = airflow
     worker_container_tag = latest
     worker_container_image_pull_policy = IfNotPresent
+    worker_dags_folder = /tmp/dags
     delete_worker_pods = True
     git_repo = https://github.com/apache/incubator-airflow.git
     git_branch = master
diff --git a/tests/contrib/executors/test_kubernetes_executor.py b/tests/contrib/executors/test_kubernetes_executor.py
index 963efcb03b..d9da48ce3b 100644
--- a/tests/contrib/executors/test_kubernetes_executor.py
+++ b/tests/contrib/executors/test_kubernetes_executor.py
@@ -133,6 +133,22 @@ def test_worker_with_subpaths(self):
                     "subPath should've been passed to volumeMount configuration"
                 )
 
+    def test_worker_environment_no_dags_folder(self):
+        self.kube_config.worker_dags_folder = ''
+        worker_config = WorkerConfiguration(self.kube_config)
+        env = worker_config._get_environment()
+
+        self.assertNotIn('AIRFLOW__CORE__DAGS_FOLDER', env)
+
+    def test_worker_environment_when_dags_folder_specified(self):
+        dags_folder = '/workers/path/to/dags'
+        self.kube_config.worker_dags_folder = dags_folder
+
+        worker_config = WorkerConfiguration(self.kube_config)
+        env = worker_config._get_environment()
+
+        self.assertEqual(dags_folder, env['AIRFLOW__CORE__DAGS_FOLDER'])
+
 
 if __name__ == '__main__':
     unittest.main()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> k8s workers think DAGs are always in `/tmp/dags`
> ------------------------------------------------
>
>                 Key: AIRFLOW-2755
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2755
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: configuration, worker
>            Reporter: Aldo
>            Assignee: Aldo
>            Priority: Minor
>             Fix For: 2.0.0
>
>
> We have Airflow configured to use the `KubernetesExecutor` and run tasks in newly created pods.
> I tried to use the `PythonOperator` to import the python callable from a python module located in the DAGs directory as [that should be possible|https://github.com/apache/incubator-airflow/blob/c7a472ed6b0d8a4720f57ba1140c8cf665757167/airflow/__init__.py#L42]. Airflow complained that the module was not found.
> After a fair amount of digging we found that the issue was that the workers have the `AIRFLOW__CORE__DAGS_FOLDER` environment variable set to `/tmp/dags` as [you can see from the code|https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/kubernetes/worker_configuration.py#L84].
> Unset that environment variable from within the task's pod and running the task manually worked as expected. I think that this path should be configurable (I'll give it a try to add a `kubernetes.worker_dags_folder` configuration).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)