You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/05/09 17:46:44 UTC

[airflow] branch master updated: Make celery worker_prefetch_multiplier configurable (#8695)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new db1b51d  Make celery worker_prefetch_multiplier configurable (#8695)
db1b51d is described below

commit db1b51df54af76245a0e93d0b2229a96efc7d3d9
Author: Ben Nadler <jb...@gmail.com>
AuthorDate: Sat May 9 10:46:13 2020 -0700

    Make celery worker_prefetch_multiplier configurable (#8695)
---
 airflow/config_templates/config.yml          | 13 +++++++++++++
 airflow/config_templates/default_airflow.cfg | 10 ++++++++++
 airflow/config_templates/default_celery.py   |  2 +-
 3 files changed, 24 insertions(+), 1 deletion(-)

diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 1fb204a..0543cbc 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1130,6 +1130,19 @@
       type: string
       example: 16,12
       default: ~
+    - name: worker_prefetch_multiplier
+      description: |
+        Used to increase the number of tasks that a worker prefetches which can improve performance.
+        The number of processes multiplied by worker_prefetch_multiplier is the number of tasks
+        that are prefetched by a worker.  A value greater than 1 can result in tasks being unnecessarily
+        blocked if there are multiple workers and one worker prefetches tasks that sit behind long
+        running tasks while another worker has unutilized processes that are unable to process the already
+        claimed blocked tasks.
+        https://docs.celeryproject.org/en/stable/userguide/optimizing.html#prefetch-limits
+      version_added: ~
+      type: int
+      example: "1"
+      default: ~
     - name: worker_log_server_port
       description: |
         When you start an airflow worker, airflow starts a tiny web server
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index f133e1c..ff82613 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -550,6 +550,16 @@ worker_concurrency = 8
 # Example: worker_autoscale = 16,12
 # worker_autoscale =
 
+# Used to increase the number of tasks that a worker prefetches which can improve performance.
+# The number of processes multiplied by worker_prefetch_multiplier is the number of tasks
+# that are prefetched by a worker.  A value greater than 1 can result in tasks being unnecessarily
+# blocked if there are multiple workers and one worker prefetches tasks that sit behind long
+# running tasks while another worker has unutilized processes that are unable to process the already
+# claimed blocked tasks.
+# https://docs.celeryproject.org/en/stable/userguide/optimizing.html#prefetch-limits
+# Example: worker_prefetch_multiplier = 1
+# worker_prefetch_multiplier =
+
 # When you start an airflow worker, airflow starts a tiny web server
 # subprocess to serve the workers local log files to the airflow main
 # web server, who then builds pages and sends them to users. This defines
diff --git a/airflow/config_templates/default_celery.py b/airflow/config_templates/default_celery.py
index 1b88b31..bf0b03b 100644
--- a/airflow/config_templates/default_celery.py
+++ b/airflow/config_templates/default_celery.py
@@ -39,7 +39,7 @@ if 'visibility_timeout' not in broker_transport_options:
 DEFAULT_CELERY_CONFIG = {
     'accept_content': ['json'],
     'event_serializer': 'json',
-    'worker_prefetch_multiplier': 1,
+    'worker_prefetch_multiplier': conf.getint('celery', 'worker_prefetch_multiplier', fallback=1),
     'task_acks_late': True,
     'task_default_queue': conf.get('celery', 'DEFAULT_QUEUE'),
     'task_default_exchange': conf.get('celery', 'DEFAULT_QUEUE'),