You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/09/28 13:48:57 UTC

incubator-airflow git commit: [AIRFLOW-1650] Fix custom celery config loading

Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-9-test b1e5c6ede -> b5d1657e3


[AIRFLOW-1650] Fix custom celery config loading

Celery config loading was broken as it was just passing
a string. This fixes it by loading it as a module with an
attribute. Inspired by Django's module loading.

(cherry picked from commit 3aa05cb227a12dd8ec3f375b8444adad67a2718d)
Signed-off-by: Bolke de Bruin <bo...@xs4all.nl>


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b5d1657e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b5d1657e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b5d1657e

Branch: refs/heads/v1-9-test
Commit: b5d1657e3540114288d0d8c65e4ce9ca00fba360
Parents: b1e5c6e
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Wed Sep 27 20:03:56 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Thu Sep 28 15:48:48 2017 +0200

----------------------------------------------------------------------
 airflow/executors/celery_executor.py |  6 ++++--
 airflow/utils/module_loading.py      | 35 +++++++++++++++++++++++++++++++
 tests/utils/test_module_loading.py   | 30 ++++++++++++++++++++++++++
 3 files changed, 69 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b5d1657e/airflow/executors/celery_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index 7e363db..d3809b3 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -23,7 +23,7 @@ from airflow.exceptions import AirflowException
 from airflow.executors.base_executor import BaseExecutor
 from airflow import configuration
 from airflow.utils.log.logging_mixin import LoggingMixin
-
+from airflow.utils.module_loading import import_string
 
 PARALLELISM = configuration.get('core', 'PARALLELISM')
 
@@ -33,7 +33,9 @@ airflow worker
 '''
 
 if configuration.has_option('celery', 'celery_config_options'):
-    celery_configuration = configuration.get('celery', 'celery_config_options')
+    celery_configuration = import_string(
+        configuration.get('celery', 'celery_config_options')
+    )
 else:
     celery_configuration = DEFAULT_CELERY_CONFIG
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b5d1657e/airflow/utils/module_loading.py
----------------------------------------------------------------------
diff --git a/airflow/utils/module_loading.py b/airflow/utils/module_loading.py
new file mode 100644
index 0000000..f2138ea
--- /dev/null
+++ b/airflow/utils/module_loading.py
@@ -0,0 +1,35 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from importlib import import_module
+
+
+def import_string(dotted_path):
+    """
+    Import a dotted module path and return the attribute/class designated by the
+    last name in the path. Raise ImportError if the import failed.
+    """
+    try:
+        module_path, class_name = dotted_path.rsplit('.', 1)
+    except ValueError:
+        raise ImportError("{} doesn't look like a module path".format(dotted_path))
+
+    module = import_module(module_path)
+
+    try:
+        return getattr(module, class_name)
+    except AttributeError as err:
+        raise ImportError('Module "{}" does not define a "{}" attribute/class'.format(
+            module_path, class_name)
+        )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b5d1657e/tests/utils/test_module_loading.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_module_loading.py b/tests/utils/test_module_loading.py
new file mode 100644
index 0000000..2eafb1f
--- /dev/null
+++ b/tests/utils/test_module_loading.py
@@ -0,0 +1,30 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+
+from airflow.utils.module_loading import import_string
+
+
+class ModuleImportTestCase(unittest.TestCase):
+    def test_import_string(self):
+        cls = import_string('airflow.utils.module_loading.import_string')
+        self.assertEqual(cls, import_string)
+
+        # Test exceptions raised
+        with self.assertRaises(ImportError):
+            import_string('no_dots_in_path')
+        msg = 'Module "airflow.utils" does not define a "nonexistent" attribute'
+        with self.assertRaisesRegexp(ImportError, msg):
+            import_string('airflow.utils.nonexistent')