You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/05/09 17:46:44 UTC
[airflow] branch master updated: Make celery
worker_prefetch_multiplier configurable (#8695)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new db1b51d Make celery worker_prefetch_multiplier configurable (#8695)
db1b51d is described below
commit db1b51df54af76245a0e93d0b2229a96efc7d3d9
Author: Ben Nadler <jb...@gmail.com>
AuthorDate: Sat May 9 10:46:13 2020 -0700
Make celery worker_prefetch_multiplier configurable (#8695)
---
airflow/config_templates/config.yml | 13 +++++++++++++
airflow/config_templates/default_airflow.cfg | 10 ++++++++++
airflow/config_templates/default_celery.py | 2 +-
3 files changed, 24 insertions(+), 1 deletion(-)
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 1fb204a..0543cbc 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1130,6 +1130,19 @@
type: string
example: 16,12
default: ~
+ - name: worker_prefetch_multiplier
+ description: |
+ Used to increase the number of tasks that a worker prefetches which can improve performance.
+ The number of processes multiplied by worker_prefetch_multiplier is the number of tasks
+ that are prefetched by a worker. A value greater than 1 can result in tasks being unnecessarily
+ blocked if there are multiple workers and one worker prefetches tasks that sit behind long
+ running tasks while another worker has unutilized processes that are unable to process the already
+ claimed blocked tasks.
+ https://docs.celeryproject.org/en/stable/userguide/optimizing.html#prefetch-limits
+ version_added: ~
+ type: int
+ example: "1"
+ default: ~
- name: worker_log_server_port
description: |
When you start an airflow worker, airflow starts a tiny web server
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index f133e1c..ff82613 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -550,6 +550,16 @@ worker_concurrency = 8
# Example: worker_autoscale = 16,12
# worker_autoscale =
+# Used to increase the number of tasks that a worker prefetches which can improve performance.
+# The number of processes multiplied by worker_prefetch_multiplier is the number of tasks
+# that are prefetched by a worker. A value greater than 1 can result in tasks being unnecessarily
+# blocked if there are multiple workers and one worker prefetches tasks that sit behind long
+# running tasks while another worker has unutilized processes that are unable to process the already
+# claimed blocked tasks.
+# https://docs.celeryproject.org/en/stable/userguide/optimizing.html#prefetch-limits
+# Example: worker_prefetch_multiplier = 1
+# worker_prefetch_multiplier =
+
# When you start an airflow worker, airflow starts a tiny web server
# subprocess to serve the workers local log files to the airflow main
# web server, who then builds pages and sends them to users. This defines
diff --git a/airflow/config_templates/default_celery.py b/airflow/config_templates/default_celery.py
index 1b88b31..bf0b03b 100644
--- a/airflow/config_templates/default_celery.py
+++ b/airflow/config_templates/default_celery.py
@@ -39,7 +39,7 @@ if 'visibility_timeout' not in broker_transport_options:
DEFAULT_CELERY_CONFIG = {
'accept_content': ['json'],
'event_serializer': 'json',
- 'worker_prefetch_multiplier': 1,
+ 'worker_prefetch_multiplier': conf.getint('celery', 'worker_prefetch_multiplier', fallback=1),
'task_acks_late': True,
'task_default_queue': conf.get('celery', 'DEFAULT_QUEUE'),
'task_default_exchange': conf.get('celery', 'DEFAULT_QUEUE'),