You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/12/12 11:44:12 UTC
incubator-airflow git commit: [AIRFLOW-1908] Fix celery broker
options config load
Repository: incubator-airflow
Updated Branches:
refs/heads/master 683a27f2c -> 22453d037
[AIRFLOW-1908] Fix celery broker options config load
Options were set to visibility timeout instead of
broker_options
directly. Furthermore, options should be int,
float, bool or string
not all string.
Closes #2867 from bolkedebruin/AIRFLOW-1908
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/22453d03
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/22453d03
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/22453d03
Branch: refs/heads/master
Commit: 22453d037ec69b3e5ab1cda4717a3dea9c47df56
Parents: 683a27f
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Tue Dec 12 12:44:06 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Dec 12 12:44:06 2017 +0100
----------------------------------------------------------------------
airflow/config_templates/default_celery.py | 2 +-
airflow/configuration.py | 23 ++++++++++++++++++++++-
scripts/ci/airflow_travis.cfg | 6 ++++++
tests/configuration.py | 14 +++++++++++++-
4 files changed, 42 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/22453d03/airflow/config_templates/default_celery.py
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_celery.py b/airflow/config_templates/default_celery.py
index 3309cbe..57b9611 100644
--- a/airflow/config_templates/default_celery.py
+++ b/airflow/config_templates/default_celery.py
@@ -32,7 +32,7 @@ DEFAULT_CELERY_CONFIG = {
'task_default_queue': configuration.get('celery', 'DEFAULT_QUEUE'),
'task_default_exchange': configuration.get('celery', 'DEFAULT_QUEUE'),
'broker_url': configuration.get('celery', 'BROKER_URL'),
- 'broker_transport_options': {'visibility_timeout': broker_transport_options},
+ 'broker_transport_options': broker_transport_options,
'result_backend': configuration.get('celery', 'RESULT_BACKEND'),
'worker_concurrency': configuration.getint('celery', 'WORKER_CONCURRENCY'),
}
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/22453d03/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index ed63952..2bb2a49 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -28,6 +28,8 @@ import sys
from future import standard_library
+from six import iteritems
+
from airflow.utils.log.logging_mixin import LoggingMixin
standard_library.install_aliases()
@@ -237,8 +239,27 @@ class AirflowConfigParser(ConfigParser):
self._validate()
def getsection(self, section):
+ """
+ Returns the section as a dict. Values are converted to int, float, bool
+ as required.
+ :param section: section from the config
+ :return: dict
+ """
if section in self._sections:
- return self._sections[section]
+ _section = self._sections[section]
+ for key, val in iteritems(self._sections[section]):
+ try:
+ val = int(val)
+ except ValueError:
+ try:
+ val = float(val)
+ except ValueError:
+ if val.lower() in ('t', 'true'):
+ val = True
+ elif val.lower() in ('f', 'false'):
+ val = False
+ _section[key] = val
+ return _section
return None
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/22453d03/scripts/ci/airflow_travis.cfg
----------------------------------------------------------------------
diff --git a/scripts/ci/airflow_travis.cfg b/scripts/ci/airflow_travis.cfg
index b71947e..ee29148 100644
--- a/scripts/ci/airflow_travis.cfg
+++ b/scripts/ci/airflow_travis.cfg
@@ -50,6 +50,12 @@ result_backend = db+mysql://root@localhost/airflow
flower_port = 5555
default_queue = default
+[celery_broker_transport_options]
+visibility_timeout = 21600
+_test_only_bool = True
+_test_only_float = 12.0
+_test_only_string = this is a test
+
[scheduler]
job_heartbeat_sec = 1
scheduler_heartbeat_sec = 5
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/22453d03/tests/configuration.py
----------------------------------------------------------------------
diff --git a/tests/configuration.py b/tests/configuration.py
index bb0fd17..300205b 100644
--- a/tests/configuration.py
+++ b/tests/configuration.py
@@ -13,12 +13,14 @@
# limitations under the License.
from __future__ import print_function
-import os
import unittest
+import six
+
from airflow import configuration
from airflow.configuration import conf
+
class ConfTest(unittest.TestCase):
def setup(self):
@@ -52,3 +54,13 @@ class ConfTest(unittest.TestCase):
cfg_dict = conf.as_dict(display_sensitive=True, display_source=True)
self.assertEqual(
cfg_dict['testsection']['testkey'], ('testvalue', 'env var'))
+
+ def test_broker_transport_options(self):
+ section_dict = conf.getsection("celery_broker_transport_options")
+ self.assertTrue(isinstance(section_dict['visibility_timeout'], int))
+
+ self.assertTrue(isinstance(section_dict['_test_only_bool'], bool))
+
+ self.assertTrue(isinstance(section_dict['_test_only_float'], float))
+
+ self.assertTrue(isinstance(section_dict['_test_only_string'], six.string_types))