You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/09/28 13:48:57 UTC
incubator-airflow git commit: [AIRFLOW-1650] Fix custom celery config
loading
Repository: incubator-airflow
Updated Branches:
refs/heads/v1-9-test b1e5c6ede -> b5d1657e3
[AIRFLOW-1650] Fix custom celery config loading
Celery config loading was broken as it was just passing
a string. This fixes it by loading it as a module with an
attribute. Inspired by Django's module loading.
(cherry picked from commit 3aa05cb227a12dd8ec3f375b8444adad67a2718d)
Signed-off-by: Bolke de Bruin <bo...@xs4all.nl>
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b5d1657e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b5d1657e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b5d1657e
Branch: refs/heads/v1-9-test
Commit: b5d1657e3540114288d0d8c65e4ce9ca00fba360
Parents: b1e5c6e
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Wed Sep 27 20:03:56 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Thu Sep 28 15:48:48 2017 +0200
----------------------------------------------------------------------
airflow/executors/celery_executor.py | 6 ++++--
airflow/utils/module_loading.py | 35 +++++++++++++++++++++++++++++++
tests/utils/test_module_loading.py | 30 ++++++++++++++++++++++++++
3 files changed, 69 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b5d1657e/airflow/executors/celery_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index 7e363db..d3809b3 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -23,7 +23,7 @@ from airflow.exceptions import AirflowException
from airflow.executors.base_executor import BaseExecutor
from airflow import configuration
from airflow.utils.log.logging_mixin import LoggingMixin
-
+from airflow.utils.module_loading import import_string
PARALLELISM = configuration.get('core', 'PARALLELISM')
@@ -33,7 +33,9 @@ airflow worker
'''
if configuration.has_option('celery', 'celery_config_options'):
- celery_configuration = configuration.get('celery', 'celery_config_options')
+ celery_configuration = import_string(
+ configuration.get('celery', 'celery_config_options')
+ )
else:
celery_configuration = DEFAULT_CELERY_CONFIG
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b5d1657e/airflow/utils/module_loading.py
----------------------------------------------------------------------
diff --git a/airflow/utils/module_loading.py b/airflow/utils/module_loading.py
new file mode 100644
index 0000000..f2138ea
--- /dev/null
+++ b/airflow/utils/module_loading.py
@@ -0,0 +1,35 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from importlib import import_module
+
+
+def import_string(dotted_path):
+ """
+ Import a dotted module path and return the attribute/class designated by the
+ last name in the path. Raise ImportError if the import failed.
+ """
+ try:
+ module_path, class_name = dotted_path.rsplit('.', 1)
+ except ValueError:
+ raise ImportError("{} doesn't look like a module path".format(dotted_path))
+
+ module = import_module(module_path)
+
+ try:
+ return getattr(module, class_name)
+ except AttributeError as err:
+ raise ImportError('Module "{}" does not define a "{}" attribute/class'.format(
+ module_path, class_name)
+ )
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b5d1657e/tests/utils/test_module_loading.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_module_loading.py b/tests/utils/test_module_loading.py
new file mode 100644
index 0000000..2eafb1f
--- /dev/null
+++ b/tests/utils/test_module_loading.py
@@ -0,0 +1,30 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+
+from airflow.utils.module_loading import import_string
+
+
+class ModuleImportTestCase(unittest.TestCase):
+ def test_import_string(self):
+ cls = import_string('airflow.utils.module_loading.import_string')
+ self.assertEqual(cls, import_string)
+
+ # Test exceptions raised
+ with self.assertRaises(ImportError):
+ import_string('no_dots_in_path')
+ msg = 'Module "airflow.utils" does not define a "nonexistent" attribute'
+ with self.assertRaisesRegexp(ImportError, msg):
+ import_string('airflow.utils.nonexistent')