You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/05/30 08:34:47 UTC

incubator-airflow git commit: [AIRFLOW-2519] Fix CeleryExecutor with SQLAlchemy

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 7c3435442 -> 7c1d7db3d


[AIRFLOW-2519] Fix CeleryExecutor with SQLAlchemy

When using a CeleryExecutor with SQLAlchemy
specified in broker_url, such as:

broker_url = sqla+mysql://airflow:airflow@localhos
t:3306/airflow

do not pass invalid options to the sqlalchemy
backend.

 - In default_airflow.cfg, comment out
visibility_timeout from
   [celery_broker_transport_options].  The user can
specify the
   correct values in this section for the celery
broker transport
   that they choose.  visibility_timeout is only
valid
   for Redis and SQS celery brokers.

 - Move ssl options from
[celery_broker_transport_options] where
   they were wrongly placed, into the [celery]
section where they
   belong.

Closes #3417 from rodrigc/AIRFLOW-2519


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7c1d7db3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7c1d7db3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7c1d7db3

Branch: refs/heads/master
Commit: 7c1d7db3dba9967f581244ddd721b4477c1577c6
Parents: 7c34354
Author: Craig Rodrigues <ro...@FreeBSD.org>
Authored: Wed May 30 10:34:41 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Wed May 30 10:34:41 2018 +0200

----------------------------------------------------------------------
 airflow/config_templates/default_airflow.cfg | 23 ++++++++++++++++-------
 airflow/config_templates/default_celery.py   | 14 +++++++++++---
 2 files changed, 27 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c1d7db3/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 60d1156..ebeb2e6 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -365,19 +365,28 @@ default_queue = default
 # Import path for celery configuration options
 celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
 
-[celery_broker_transport_options]
-# The visibility timeout defines the number of seconds to wait for the worker
-# to acknowledge the task before the message is redelivered to another worker.
-# Make sure to increase the visibility timeout to match the time of the longest
-# ETA you're planning to use. Especially important in case of using Redis or SQS
-visibility_timeout = 21600
-
 # In case of using SSL
 ssl_active = False
 ssl_key =
 ssl_cert =
 ssl_cacert =
 
+[celery_broker_transport_options]
+# This section is for specifying options which can be passed to the
+# underlying celery broker transport.  See:
+# http://docs.celeryproject.org/en/latest/userguide/configuration.html#std:setting-broker_transport_options
+
+# The visibility timeout defines the number of seconds to wait for the worker
+# to acknowledge the task before the message is redelivered to another worker.
+# Make sure to increase the visibility timeout to match the time of the longest
+# ETA you're planning to use.
+#
+# visibility_timeout is only supported for Redis and SQS celery brokers.
+# See:
+#   http://docs.celeryproject.org/en/master/userguide/configuration.html#std:setting-broker_transport_options
+#
+#visibility_timeout = 21600
+
 [dask]
 # This section only applies if you are using the DaskExecutor in
 # [core] section above

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c1d7db3/airflow/config_templates/default_celery.py
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_celery.py b/airflow/config_templates/default_celery.py
index 9a5bdf8..d44f2b3 100644
--- a/airflow/config_templates/default_celery.py
+++ b/airflow/config_templates/default_celery.py
@@ -23,13 +23,21 @@ from airflow import configuration
 from airflow.exceptions import AirflowConfigException, AirflowException
 from airflow.utils.log.logging_mixin import LoggingMixin
 
+
+def _broker_supports_visibility_timeout(url):
+    return url.startswith("redis://") or url.startswith("sqs://")
+
+
 log = LoggingMixin().log
 
+broker_url = configuration.conf.get('celery', 'BROKER_URL')
+
 broker_transport_options = configuration.conf.getsection(
     'celery_broker_transport_options'
 )
-if broker_transport_options is None:
-    broker_transport_options = {'visibility_timeout': 21600}
+if 'visibility_timeout' not in broker_transport_options:
+    if _broker_supports_visibility_timeout(broker_url):
+        broker_transport_options = {'visibility_timeout': 21600}
 
 DEFAULT_CELERY_CONFIG = {
     'accept_content': ['json', 'pickle'],
@@ -38,7 +46,7 @@ DEFAULT_CELERY_CONFIG = {
     'task_acks_late': True,
     'task_default_queue': configuration.conf.get('celery', 'DEFAULT_QUEUE'),
     'task_default_exchange': configuration.conf.get('celery', 'DEFAULT_QUEUE'),
-    'broker_url': configuration.conf.get('celery', 'BROKER_URL'),
+    'broker_url': broker_url,
     'broker_transport_options': broker_transport_options,
     'result_backend': configuration.conf.get('celery', 'RESULT_BACKEND'),
     'worker_concurrency': configuration.conf.getint('celery', 'WORKER_CONCURRENCY'),