You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2017/12/11 17:56:38 UTC

incubator-airflow git commit: [AIRFLOW-1840] Make celery configuration congruent with Celery 4

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 02ff8ae35 -> 30076f1e4


[AIRFLOW-1840] Make celery configuration congruent with Celery 4

Explicitly set the celery backend from the config
and align the config
with the celery config as this might be confusing.

Closes #2806 from Fokko/AIRFLOW-1840-Fix-celery-
config


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/30076f1e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/30076f1e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/30076f1e

Branch: refs/heads/master
Commit: 30076f1e45c7db54b0ac4bc118617d9a00cb5b7d
Parents: 02ff8ae
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Mon Dec 11 18:56:29 2017 +0100
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Mon Dec 11 18:56:29 2017 +0100

----------------------------------------------------------------------
 UPDATING.md                                  | 11 +++++++++++
 airflow/config_templates/default_airflow.cfg | 18 +++++++++++++++---
 airflow/config_templates/default_celery.py   | 21 ++++++++++-----------
 airflow/configuration.py                     |  2 +-
 docs/configuration.rst                       |  2 +-
 scripts/ci/airflow_travis.cfg                |  3 +--
 tests/executors/test_celery_executor.py      |  8 +++-----
 7 files changed, 42 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/30076f1e/UPDATING.md
----------------------------------------------------------------------
diff --git a/UPDATING.md b/UPDATING.md
index 3c7d549..2fec5e9 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -3,6 +3,17 @@
 This file documents any backwards-incompatible changes in Airflow and
 assists people when migrating to a new version.
 
+## Airflow 1.9.1
+
+### Celery config
+
+To make the config of Airflow compatible with Celery, some properties have been renamed:
+```
+celeryd_concurrency -> worker_concurrency
+celery_result_backend -> result_backend
+```
+This will result in the same config parameters as Celery 4 and will make it more transparent.
+
 ## Airflow 1.9
 
 ### SSH Hook updates, along with new SSH Operator & SFTP Operator

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/30076f1e/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index a02bb39..e75cf9a 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -280,7 +280,7 @@ celery_app_name = airflow.executors.celery_executor
 # "airflow worker" command. This defines the number of task instances that
 # a worker will take, so size up your workers based on the resources on
 # your worker box and the nature of your tasks
-celeryd_concurrency = 16
+worker_concurrency = 16
 
 # When you start an airflow worker, airflow starts a tiny web server
 # subprocess to serve the workers local log files to the airflow main
@@ -292,10 +292,16 @@ worker_log_server_port = 8793
 # The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
 # a sqlalchemy database. Refer to the Celery documentation for more
 # information.
+# http://docs.celeryproject.org/en/latest/userguide/configuration.html#broker-settings
 broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
 
-# Another key Celery setting
-celery_result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
+# The Celery result_backend. When a job finishes, it needs to update the
+# metadata of the job. Therefore it will post a message on a message bus,
+# or insert it into a database (depending of the backend)
+# This status is used by the scheduler to update the state of the task
+# The use of a database is highly recommended
+# http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-result-backend-settings
+result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
 
 # Celery Flower is a sweet UI for Celery. Airflow has a shortcut to start
 # it `airflow flower`. This defines the IP that Celery Flower runs on
@@ -317,6 +323,12 @@ celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_C
 # ETA you're planning to use. Especially important in case of using Redis or SQS
 visibility_timeout = 21600
 
+# In case of using SSL
+ssl_active = False
+ssl_key =
+ssl_cert =
+ssl_cacert =
+
 [dask]
 # This section only applies if you are using the DaskExecutor in
 # [core] section above

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/30076f1e/airflow/config_templates/default_celery.py
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_celery.py b/airflow/config_templates/default_celery.py
index 4f8c92d..3309cbe 100644
--- a/airflow/config_templates/default_celery.py
+++ b/airflow/config_templates/default_celery.py
@@ -14,8 +14,8 @@
 
 import ssl
 
-from airflow.exceptions import AirflowConfigException, AirflowException
 from airflow import configuration
+from airflow.exceptions import AirflowConfigException, AirflowException
 from airflow.utils.log.logging_mixin import LoggingMixin
 
 log = LoggingMixin().log
@@ -27,34 +27,33 @@ if broker_transport_options is None:
 DEFAULT_CELERY_CONFIG = {
     'accept_content': ['json', 'pickle'],
     'event_serializer': 'json',
-    'result_serializer': 'pickle',
     'worker_prefetch_multiplier': 1,
     'task_acks_late': True,
     'task_default_queue': configuration.get('celery', 'DEFAULT_QUEUE'),
     'task_default_exchange': configuration.get('celery', 'DEFAULT_QUEUE'),
     'broker_url': configuration.get('celery', 'BROKER_URL'),
     'broker_transport_options': {'visibility_timeout': broker_transport_options},
-    'result_backend': configuration.get('celery', 'CELERY_RESULT_BACKEND'),
-    'worker_concurrency': configuration.getint('celery', 'CELERYD_CONCURRENCY'),
+    'result_backend': configuration.get('celery', 'RESULT_BACKEND'),
+    'worker_concurrency': configuration.getint('celery', 'WORKER_CONCURRENCY'),
 }
 
 celery_ssl_active = False
 try:
-    celery_ssl_active = configuration.getboolean('celery', 'CELERY_SSL_ACTIVE')
+    celery_ssl_active = configuration.getboolean('celery', 'SSL_ACTIVE')
 except AirflowConfigException as e:
     log.warning("Celery Executor will run without SSL")
 
 try:
     if celery_ssl_active:
-        broker_use_ssl = {'keyfile': configuration.get('celery', 'CELERY_SSL_KEY'),
-                          'certfile': configuration.get('celery', 'CELERY_SSL_CERT'),
-                          'ca_certs': configuration.get('celery', 'CELERY_SSL_CACERT'),
+        broker_use_ssl = {'keyfile': configuration.get('celery', 'SSL_KEY'),
+                          'certfile': configuration.get('celery', 'SSL_CERT'),
+                          'ca_certs': configuration.get('celery', 'SSL_CACERT'),
                           'cert_reqs': ssl.CERT_REQUIRED}
         DEFAULT_CELERY_CONFIG['broker_use_ssl'] = broker_use_ssl
 except AirflowConfigException as e:
-    raise AirflowException('AirflowConfigException: CELERY_SSL_ACTIVE is True, '
-                           'please ensure CELERY_SSL_KEY, '
-                           'CELERY_SSL_CERT and CELERY_SSL_CACERT are set')
+    raise AirflowException('AirflowConfigException: SSL_ACTIVE is True, '
+                           'please ensure SSL_KEY, '
+                           'SSL_CERT and SSL_CACERT are set')
 except Exception as e:
     raise AirflowException('Exception: There was an unknown Celery SSL Error. '
                            'Please ensure you want to use '

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/30076f1e/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 84913ff..ed63952 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -116,7 +116,7 @@ class AirflowConfigParser(ConfigParser):
         ('core', 'sql_alchemy_conn'),
         ('core', 'fernet_key'),
         ('celery', 'broker_url'),
-        ('celery', 'celery_result_backend')
+        ('celery', 'result_backend')
     }
 
     def __init__(self, *args, **kwargs):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/30076f1e/docs/configuration.rst
----------------------------------------------------------------------
diff --git a/docs/configuration.rst b/docs/configuration.rst
index 35616f2..51984e0 100644
--- a/docs/configuration.rst
+++ b/docs/configuration.rst
@@ -35,7 +35,7 @@ You can also derive the connection string at run time by appending ``_cmd`` to t
     [core]
     sql_alchemy_conn_cmd = bash_command_to_run
 
-But only three such configuration elements namely sql_alchemy_conn, broker_url and celery_result_backend can be fetched as a command. The idea behind this is to not store passwords on boxes in plain text files. The order of precedence is as follows -
+-But only three such configuration elements namely sql_alchemy_conn, broker_url and result_backend can be fetched as a command. The idea behind this is to not store passwords on boxes in plain text files. The order of precedence is as follows -	
 
 1. environment variable
 2. configuration in airflow.cfg

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/30076f1e/scripts/ci/airflow_travis.cfg
----------------------------------------------------------------------
diff --git a/scripts/ci/airflow_travis.cfg b/scripts/ci/airflow_travis.cfg
index 6a8db93..b71947e 100644
--- a/scripts/ci/airflow_travis.cfg
+++ b/scripts/ci/airflow_travis.cfg
@@ -46,7 +46,7 @@ celery_app_name = airflow.executors.celery_executor
 celeryd_concurrency = 16
 worker_log_server_port = 8793
 broker_url = amqp://guest:guest@localhost:5672/
-celery_result_backend = db+mysql://root@localhost/airflow
+result_backend = db+mysql://root@localhost/airflow
 flower_port = 5555
 default_queue = default
 
@@ -55,4 +55,3 @@ job_heartbeat_sec = 1
 scheduler_heartbeat_sec = 5
 authenticate = true
 max_threads = 2
-

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/30076f1e/tests/executors/test_celery_executor.py
----------------------------------------------------------------------
diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py
index 1c411e7..9abc60c 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/executors/test_celery_executor.py
@@ -11,20 +11,18 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import unittest
 import sys
+import unittest
+from celery.contrib.testing.worker import start_worker
 
-from airflow.executors.celery_executor import app
 from airflow.executors.celery_executor import CeleryExecutor
+from airflow.executors.celery_executor import app
 from airflow.utils.state import State
-from celery.contrib.testing.worker import start_worker
 
 # leave this it is used by the test worker
 import celery.contrib.testing.tasks
 
-
 class CeleryExecutorTest(unittest.TestCase):
-
     def test_celery_integration(self):
         executor = CeleryExecutor()
         executor.start()