You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/12/12 11:44:12 UTC

incubator-airflow git commit: [AIRFLOW-1908] Fix celery broker options config load

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 683a27f2c -> 22453d037


[AIRFLOW-1908] Fix celery broker options config load

Options were set to visibility timeout instead of
broker_options
directly. Furthermore, options should be int,
float, bool or string
not all string.

Closes #2867 from bolkedebruin/AIRFLOW-1908


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/22453d03
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/22453d03
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/22453d03

Branch: refs/heads/master
Commit: 22453d037ec69b3e5ab1cda4717a3dea9c47df56
Parents: 683a27f
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Tue Dec 12 12:44:06 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Dec 12 12:44:06 2017 +0100

----------------------------------------------------------------------
 airflow/config_templates/default_celery.py |  2 +-
 airflow/configuration.py                   | 23 ++++++++++++++++++++++-
 scripts/ci/airflow_travis.cfg              |  6 ++++++
 tests/configuration.py                     | 14 +++++++++++++-
 4 files changed, 42 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/22453d03/airflow/config_templates/default_celery.py
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_celery.py b/airflow/config_templates/default_celery.py
index 3309cbe..57b9611 100644
--- a/airflow/config_templates/default_celery.py
+++ b/airflow/config_templates/default_celery.py
@@ -32,7 +32,7 @@ DEFAULT_CELERY_CONFIG = {
     'task_default_queue': configuration.get('celery', 'DEFAULT_QUEUE'),
     'task_default_exchange': configuration.get('celery', 'DEFAULT_QUEUE'),
     'broker_url': configuration.get('celery', 'BROKER_URL'),
-    'broker_transport_options': {'visibility_timeout': broker_transport_options},
+    'broker_transport_options': broker_transport_options,
     'result_backend': configuration.get('celery', 'RESULT_BACKEND'),
     'worker_concurrency': configuration.getint('celery', 'WORKER_CONCURRENCY'),
 }

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/22453d03/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index ed63952..2bb2a49 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -28,6 +28,8 @@ import sys
 
 from future import standard_library
 
+from six import iteritems
+
 from airflow.utils.log.logging_mixin import LoggingMixin
 
 standard_library.install_aliases()
@@ -237,8 +239,27 @@ class AirflowConfigParser(ConfigParser):
         self._validate()
 
     def getsection(self, section):
+        """
+        Returns the section as a dict. Values are converted to int, float, bool
+        as required.
+        :param section: section from the config
+        :return: dict
+        """
         if section in self._sections:
-            return self._sections[section]
+            _section = self._sections[section]
+            for key, val in iteritems(self._sections[section]):
+                try:
+                    val = int(val)
+                except ValueError:
+                    try:
+                        val = float(val)
+                    except ValueError:
+                        if val.lower() in ('t', 'true'):
+                            val = True
+                        elif val.lower() in ('f', 'false'):
+                            val = False
+                _section[key] = val
+            return _section
 
         return None
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/22453d03/scripts/ci/airflow_travis.cfg
----------------------------------------------------------------------
diff --git a/scripts/ci/airflow_travis.cfg b/scripts/ci/airflow_travis.cfg
index b71947e..ee29148 100644
--- a/scripts/ci/airflow_travis.cfg
+++ b/scripts/ci/airflow_travis.cfg
@@ -50,6 +50,12 @@ result_backend = db+mysql://root@localhost/airflow
 flower_port = 5555
 default_queue = default
 
+[celery_broker_transport_options]
+visibility_timeout = 21600
+_test_only_bool = True
+_test_only_float = 12.0
+_test_only_string = this is a test
+
 [scheduler]
 job_heartbeat_sec = 1
 scheduler_heartbeat_sec = 5

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/22453d03/tests/configuration.py
----------------------------------------------------------------------
diff --git a/tests/configuration.py b/tests/configuration.py
index bb0fd17..300205b 100644
--- a/tests/configuration.py
+++ b/tests/configuration.py
@@ -13,12 +13,14 @@
 # limitations under the License.
 
 from __future__ import print_function
-import os
 import unittest
 
+import six
+
 from airflow import configuration
 from airflow.configuration import conf
 
+
 class ConfTest(unittest.TestCase):
 
     def setup(self):
@@ -52,3 +54,13 @@ class ConfTest(unittest.TestCase):
         cfg_dict = conf.as_dict(display_sensitive=True, display_source=True)
         self.assertEqual(
             cfg_dict['testsection']['testkey'], ('testvalue', 'env var'))
+
+    def test_broker_transport_options(self):
+        section_dict = conf.getsection("celery_broker_transport_options")
+        self.assertTrue(isinstance(section_dict['visibility_timeout'], int))
+
+        self.assertTrue(isinstance(section_dict['_test_only_bool'], bool))
+
+        self.assertTrue(isinstance(section_dict['_test_only_float'], float))
+
+        self.assertTrue(isinstance(section_dict['_test_only_string'], six.string_types))