You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2017/12/11 17:56:38 UTC
incubator-airflow git commit: [AIRFLOW-1840] Make celery
configuration congruent with Celery 4
Repository: incubator-airflow
Updated Branches:
refs/heads/master 02ff8ae35 -> 30076f1e4
[AIRFLOW-1840] Make celery configuration congruent with Celery 4
Explicitly set the celery backend from the config
and align the config
with the celery config as this might be confusing.
Closes #2806 from Fokko/AIRFLOW-1840-Fix-celery-
config
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/30076f1e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/30076f1e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/30076f1e
Branch: refs/heads/master
Commit: 30076f1e45c7db54b0ac4bc118617d9a00cb5b7d
Parents: 02ff8ae
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Mon Dec 11 18:56:29 2017 +0100
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Mon Dec 11 18:56:29 2017 +0100
----------------------------------------------------------------------
UPDATING.md | 11 +++++++++++
airflow/config_templates/default_airflow.cfg | 18 +++++++++++++++---
airflow/config_templates/default_celery.py | 21 ++++++++++-----------
airflow/configuration.py | 2 +-
docs/configuration.rst | 2 +-
scripts/ci/airflow_travis.cfg | 3 +--
tests/executors/test_celery_executor.py | 8 +++-----
7 files changed, 42 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/30076f1e/UPDATING.md
----------------------------------------------------------------------
diff --git a/UPDATING.md b/UPDATING.md
index 3c7d549..2fec5e9 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -3,6 +3,17 @@
This file documents any backwards-incompatible changes in Airflow and
assists people when migrating to a new version.
+## Airflow 1.9.1
+
+### Celery config
+
+To make the config of Airflow compatible with Celery, some properties have been renamed:
+```
+celeryd_concurrency -> worker_concurrency
+celery_result_backend -> result_backend
+```
+This will result in the same config parameters as Celery 4 and will make it more transparent.
+
## Airflow 1.9
### SSH Hook updates, along with new SSH Operator & SFTP Operator
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/30076f1e/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index a02bb39..e75cf9a 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -280,7 +280,7 @@ celery_app_name = airflow.executors.celery_executor
# "airflow worker" command. This defines the number of task instances that
# a worker will take, so size up your workers based on the resources on
# your worker box and the nature of your tasks
-celeryd_concurrency = 16
+worker_concurrency = 16
# When you start an airflow worker, airflow starts a tiny web server
# subprocess to serve the workers local log files to the airflow main
@@ -292,10 +292,16 @@ worker_log_server_port = 8793
# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
# a sqlalchemy database. Refer to the Celery documentation for more
# information.
+# http://docs.celeryproject.org/en/latest/userguide/configuration.html#broker-settings
broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
-# Another key Celery setting
-celery_result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
+# The Celery result_backend. When a job finishes, it needs to update the
+# metadata of the job. Therefore it will post a message on a message bus,
+# or insert it into a database (depending of the backend)
+# This status is used by the scheduler to update the state of the task
+# The use of a database is highly recommended
+# http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-result-backend-settings
+result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
# Celery Flower is a sweet UI for Celery. Airflow has a shortcut to start
# it `airflow flower`. This defines the IP that Celery Flower runs on
@@ -317,6 +323,12 @@ celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_C
# ETA you're planning to use. Especially important in case of using Redis or SQS
visibility_timeout = 21600
+# In case of using SSL
+ssl_active = False
+ssl_key =
+ssl_cert =
+ssl_cacert =
+
[dask]
# This section only applies if you are using the DaskExecutor in
# [core] section above
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/30076f1e/airflow/config_templates/default_celery.py
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_celery.py b/airflow/config_templates/default_celery.py
index 4f8c92d..3309cbe 100644
--- a/airflow/config_templates/default_celery.py
+++ b/airflow/config_templates/default_celery.py
@@ -14,8 +14,8 @@
import ssl
-from airflow.exceptions import AirflowConfigException, AirflowException
from airflow import configuration
+from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.utils.log.logging_mixin import LoggingMixin
log = LoggingMixin().log
@@ -27,34 +27,33 @@ if broker_transport_options is None:
DEFAULT_CELERY_CONFIG = {
'accept_content': ['json', 'pickle'],
'event_serializer': 'json',
- 'result_serializer': 'pickle',
'worker_prefetch_multiplier': 1,
'task_acks_late': True,
'task_default_queue': configuration.get('celery', 'DEFAULT_QUEUE'),
'task_default_exchange': configuration.get('celery', 'DEFAULT_QUEUE'),
'broker_url': configuration.get('celery', 'BROKER_URL'),
'broker_transport_options': {'visibility_timeout': broker_transport_options},
- 'result_backend': configuration.get('celery', 'CELERY_RESULT_BACKEND'),
- 'worker_concurrency': configuration.getint('celery', 'CELERYD_CONCURRENCY'),
+ 'result_backend': configuration.get('celery', 'RESULT_BACKEND'),
+ 'worker_concurrency': configuration.getint('celery', 'WORKER_CONCURRENCY'),
}
celery_ssl_active = False
try:
- celery_ssl_active = configuration.getboolean('celery', 'CELERY_SSL_ACTIVE')
+ celery_ssl_active = configuration.getboolean('celery', 'SSL_ACTIVE')
except AirflowConfigException as e:
log.warning("Celery Executor will run without SSL")
try:
if celery_ssl_active:
- broker_use_ssl = {'keyfile': configuration.get('celery', 'CELERY_SSL_KEY'),
- 'certfile': configuration.get('celery', 'CELERY_SSL_CERT'),
- 'ca_certs': configuration.get('celery', 'CELERY_SSL_CACERT'),
+ broker_use_ssl = {'keyfile': configuration.get('celery', 'SSL_KEY'),
+ 'certfile': configuration.get('celery', 'SSL_CERT'),
+ 'ca_certs': configuration.get('celery', 'SSL_CACERT'),
'cert_reqs': ssl.CERT_REQUIRED}
DEFAULT_CELERY_CONFIG['broker_use_ssl'] = broker_use_ssl
except AirflowConfigException as e:
- raise AirflowException('AirflowConfigException: CELERY_SSL_ACTIVE is True, '
- 'please ensure CELERY_SSL_KEY, '
- 'CELERY_SSL_CERT and CELERY_SSL_CACERT are set')
+ raise AirflowException('AirflowConfigException: SSL_ACTIVE is True, '
+ 'please ensure SSL_KEY, '
+ 'SSL_CERT and SSL_CACERT are set')
except Exception as e:
raise AirflowException('Exception: There was an unknown Celery SSL Error. '
'Please ensure you want to use '
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/30076f1e/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 84913ff..ed63952 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -116,7 +116,7 @@ class AirflowConfigParser(ConfigParser):
('core', 'sql_alchemy_conn'),
('core', 'fernet_key'),
('celery', 'broker_url'),
- ('celery', 'celery_result_backend')
+ ('celery', 'result_backend')
}
def __init__(self, *args, **kwargs):
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/30076f1e/docs/configuration.rst
----------------------------------------------------------------------
diff --git a/docs/configuration.rst b/docs/configuration.rst
index 35616f2..51984e0 100644
--- a/docs/configuration.rst
+++ b/docs/configuration.rst
@@ -35,7 +35,7 @@ You can also derive the connection string at run time by appending ``_cmd`` to t
[core]
sql_alchemy_conn_cmd = bash_command_to_run
-But only three such configuration elements namely sql_alchemy_conn, broker_url and celery_result_backend can be fetched as a command. The idea behind this is to not store passwords on boxes in plain text files. The order of precedence is as follows -
+-But only three such configuration elements namely sql_alchemy_conn, broker_url and result_backend can be fetched as a command. The idea behind this is to not store passwords on boxes in plain text files. The order of precedence is as follows -
1. environment variable
2. configuration in airflow.cfg
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/30076f1e/scripts/ci/airflow_travis.cfg
----------------------------------------------------------------------
diff --git a/scripts/ci/airflow_travis.cfg b/scripts/ci/airflow_travis.cfg
index 6a8db93..b71947e 100644
--- a/scripts/ci/airflow_travis.cfg
+++ b/scripts/ci/airflow_travis.cfg
@@ -46,7 +46,7 @@ celery_app_name = airflow.executors.celery_executor
celeryd_concurrency = 16
worker_log_server_port = 8793
broker_url = amqp://guest:guest@localhost:5672/
-celery_result_backend = db+mysql://root@localhost/airflow
+result_backend = db+mysql://root@localhost/airflow
flower_port = 5555
default_queue = default
@@ -55,4 +55,3 @@ job_heartbeat_sec = 1
scheduler_heartbeat_sec = 5
authenticate = true
max_threads = 2
-
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/30076f1e/tests/executors/test_celery_executor.py
----------------------------------------------------------------------
diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py
index 1c411e7..9abc60c 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/executors/test_celery_executor.py
@@ -11,20 +11,18 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import unittest
import sys
+import unittest
+from celery.contrib.testing.worker import start_worker
-from airflow.executors.celery_executor import app
from airflow.executors.celery_executor import CeleryExecutor
+from airflow.executors.celery_executor import app
from airflow.utils.state import State
-from celery.contrib.testing.worker import start_worker
# leave this it is used by the test worker
import celery.contrib.testing.tasks
-
class CeleryExecutorTest(unittest.TestCase):
-
def test_celery_integration(self):
executor = CeleryExecutor()
executor.start()