You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/11/30 14:52:40 UTC

[airflow] branch v1-10-test updated: Rename `[scheduler] max_threads` to `[scheduler] parsing_processes` (#12605)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-test by this push:
     new b91a63b  Rename `[scheduler] max_threads` to `[scheduler] parsing_processes` (#12605)
b91a63b is described below

commit b91a63be956c1dc767fa29f04a8159c7a6c3a98d
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Wed Nov 25 09:33:19 2020 +0000

    Rename `[scheduler] max_threads` to `[scheduler] parsing_processes` (#12605)
    
    From Airflow 1.10.14, `max_threads` config under `[scheduler]` section has been renamed to `parsing_processes`.
    
    This is to align the name with the actual code where the Scheduler launches the number of processes defined by
    `[scheduler] parsing_processes` to Parse DAG files, calculates next DagRun date for each DAG,
    serialize them and store them in the DB.
    
    (cherry picked from commit 486134426bf2cd54fae1f75d9bd50715b8369ca1)
---
 UPDATING.md                                  | 10 ++++++++++
 airflow/config_templates/config.yml          |  6 +++---
 airflow/config_templates/default_airflow.cfg |  6 +++---
 airflow/config_templates/default_test.cfg    |  2 +-
 airflow/configuration.py                     |  3 +++
 airflow/jobs/scheduler_job.py                |  2 +-
 airflow/utils/dag_processing.py              |  4 ++--
 docs/faq.rst                                 |  7 +++++--
 scripts/in_container/airflow_ci.cfg          |  2 +-
 tests/utils/test_dag_processing.py           |  2 +-
 10 files changed, 30 insertions(+), 14 deletions(-)

diff --git a/UPDATING.md b/UPDATING.md
index c29c8a3..f651c97 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -25,6 +25,7 @@ assists users migrating to a new version.
 <!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
 **Table of contents**
 
+- [Airflow 1.10.14](#airflow-11014)
 - [Airflow 1.10.13](#airflow-11013)
 - [Airflow 1.10.12](#airflow-11012)
 - [Airflow 1.10.11](#airflow-11011)
@@ -61,6 +62,15 @@ More tips can be found in the guide:
 https://developers.google.com/style/inclusive-documentation
 
 -->
+## Airflow 1.10.14
+
+### `[scheduler] max_threads` config has been renamed to `[scheduler] parsing_processes`
+
+From Airflow 1.10.14, `max_threads` config under `[scheduler]` section has been renamed to `parsing_processes`.
+
+This is to align the name with the actual code where the Scheduler launches the number of processes defined by
+`[scheduler] parsing_processes` to parse the DAG files.
+
 ## Airflow 1.10.13
 
 ### TimeSensor is now timezone aware
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index e89df22..87ee928 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1439,10 +1439,10 @@
       type: string
       example: ~
       default: ""
-    - name: max_threads
+    - name: parsing_processes
       description: |
-        The scheduler can run multiple threads in parallel to schedule dags.
-        This defines how many threads will run.
+        The scheduler can run multiple processes in parallel to parse dags.
+        This defines how many processes will run.
       version_added: ~
       type: string
       example: ~
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index ea64d8f..662fd00 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -695,9 +695,9 @@ statsd_prefix = airflow
 # start with the elements of the list (e.g: scheduler,executor,dagrun)
 statsd_allow_list =
 
-# The scheduler can run multiple threads in parallel to schedule dags.
-# This defines how many threads will run.
-max_threads = 2
+# The scheduler can run multiple processes in parallel to parse dags.
+# This defines how many processes will run.
+parsing_processes = 2
 authenticate = False
 
 # Turn off scheduler use of cron intervals by setting this to False.
diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg
index 3ac2225..30a82a4 100644
--- a/airflow/config_templates/default_test.cfg
+++ b/airflow/config_templates/default_test.cfg
@@ -113,7 +113,7 @@ job_heartbeat_sec = 1
 scheduler_heartbeat_sec = 5
 scheduler_health_check_threshold = 30
 authenticate = true
-max_threads = 2
+parsing_processes = 2
 catchup_by_default = True
 scheduler_zombie_task_threshold = 300
 dag_dir_list_interval = 0
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 290843f..16081a3 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -183,6 +183,9 @@ class AirflowConfigParser(ConfigParser):
             'json_format': 'elasticsearch_json_format',
             'json_fields': 'elasticsearch_json_fields'
 
+        },
+        'scheduler': {
+            'parsing_processes': 'max_threads'
         }
     }
 
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 9149699..b72e2b1 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -391,7 +391,7 @@ class SchedulerJob(BaseJob):
         self.do_pickle = do_pickle
         super(SchedulerJob, self).__init__(*args, **kwargs)
 
-        self.max_threads = conf.getint('scheduler', 'max_threads')
+        self.max_threads = conf.getint('scheduler', 'parsing_processes')
 
         if log:
             self._log = log
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index 4a4b240..881a8ce 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -769,10 +769,10 @@ class DagFileProcessorManager(LoggingMixin):
         self._dag_ids = dag_ids
         self._async_mode = async_mode
 
-        self._parallelism = conf.getint('scheduler', 'max_threads')
+        self._parallelism = conf.getint('scheduler', 'parsing_processes')
         if 'sqlite' in conf.get('core', 'sql_alchemy_conn') and self._parallelism > 1:
             self.log.warning(
-                "Because we cannot use more than 1 thread (max_threads = {}) "
+                "Because we cannot use more than 1 thread (parsing_processes = {}) "
                 "when using sqlite. So we set parallelism to 1.".format(self._parallelism)
             )
             self._parallelism = 1
diff --git a/docs/faq.rst b/docs/faq.rst
index 80849e0..c041e0a 100644
--- a/docs/faq.rst
+++ b/docs/faq.rst
@@ -205,8 +205,11 @@ This means ``explicit_defaults_for_timestamp`` is disabled in your mysql server
 How to reduce airflow dag scheduling latency in production?
 -----------------------------------------------------------
 
-- ``max_threads``: Scheduler will spawn multiple threads in parallel to schedule dags. This is controlled by ``max_threads`` with default value of 2. User should increase this value to a larger value (e.g numbers of cpus where scheduler runs - 1) in production.
-- ``scheduler_heartbeat_sec``: User should consider to increase ``scheduler_heartbeat_sec`` config to a higher value (e.g 60 secs) which controls how frequent the airflow scheduler gets the heartbeat and updates the job's entry in database.
+- ``parsing_processes``: Scheduler will spawn multiple threads in parallel to parse dags.
+  This is controlled by ``parsing_processes`` with default value of 2.
+  User should increase this value to a larger value (e.g numbers of cpus where scheduler runs + 1) in production.
+- ``scheduler_heartbeat_sec``: User should consider to increase ``scheduler_heartbeat_sec`` config to a higher value (e.g 60 secs) which controls how frequent the airflow scheduler gets the heartbeat
+  and updates the job's entry in database.
 
 Why next_ds or prev_ds might not contain expected values?
 ---------------------------------------------------------
diff --git a/scripts/in_container/airflow_ci.cfg b/scripts/in_container/airflow_ci.cfg
index b097752..4933af0 100644
--- a/scripts/in_container/airflow_ci.cfg
+++ b/scripts/in_container/airflow_ci.cfg
@@ -52,4 +52,4 @@ _test_only_string = this is a test
 job_heartbeat_sec = 1
 scheduler_heartbeat_sec = 5
 authenticate = true
-max_threads = 2
+parsing_processes = 2
diff --git a/tests/utils/test_dag_processing.py b/tests/utils/test_dag_processing.py
index 3726a8d..b2bdf35 100644
--- a/tests/utils/test_dag_processing.py
+++ b/tests/utils/test_dag_processing.py
@@ -267,7 +267,7 @@ class TestDagFileProcessorManager(unittest.TestCase):
         file processors until the next zombie detection logic is invoked.
         """
         test_dag_path = os.path.join(TEST_DAG_FOLDER, 'test_example_bash_operator.py')
-        with conf_vars({('scheduler', 'max_threads'): '1',
+        with conf_vars({('scheduler', 'parsing_processes'): '1',
                         ('core', 'load_examples'): 'False'}):
             dagbag = DagBag(test_dag_path)
             with create_session() as session: