You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/12/02 23:29:31 UTC
[airflow] 21/33: Rename `[scheduler] max_threads` to `[scheduler]
parsing_processes` (#12605)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 6383f75c21269866c373e3cdfb7323de7ee224b0
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Wed Nov 25 09:33:19 2020 +0000
Rename `[scheduler] max_threads` to `[scheduler] parsing_processes` (#12605)
From Airflow 1.10.14, `max_threads` config under `[scheduler]` section has been renamed to `parsing_processes`.
This is to align the name with the actual code where the Scheduler launches the number of processes defined by
`[scheduler] parsing_processes` to Parse DAG files, calculates next DagRun date for each DAG,
serialize them and store them in the DB.
(cherry picked from commit 486134426bf2cd54fae1f75d9bd50715b8369ca1)
---
UPDATING.md | 7 +++++++
airflow/config_templates/config.yml | 6 +++---
airflow/config_templates/default_airflow.cfg | 6 +++---
airflow/config_templates/default_test.cfg | 2 +-
airflow/configuration.py | 3 +++
airflow/jobs/scheduler_job.py | 2 +-
airflow/utils/dag_processing.py | 4 ++--
docs/faq.rst | 7 +++++--
scripts/in_container/airflow_ci.cfg | 2 +-
tests/utils/test_dag_processing.py | 2 +-
10 files changed, 27 insertions(+), 14 deletions(-)
diff --git a/UPDATING.md b/UPDATING.md
index 577b644..4ad226a 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -64,6 +64,13 @@ https://developers.google.com/style/inclusive-documentation
-->
## Airflow 1.10.14
+### `[scheduler] max_threads` config has been renamed to `[scheduler] parsing_processes`
+
+From Airflow 1.10.14, `max_threads` config under `[scheduler]` section has been renamed to `parsing_processes`.
+
+This is to align the name with the actual code where the Scheduler launches the number of processes defined by
+`[scheduler] parsing_processes` to parse the DAG files.
+
### Airflow CLI changes in line with 2.0
The Airflow CLI has been organized so that related commands are grouped together as subcommands,
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index e89df22..87ee928 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1439,10 +1439,10 @@
type: string
example: ~
default: ""
- - name: max_threads
+ - name: parsing_processes
description: |
- The scheduler can run multiple threads in parallel to schedule dags.
- This defines how many threads will run.
+ The scheduler can run multiple processes in parallel to parse dags.
+ This defines how many processes will run.
version_added: ~
type: string
example: ~
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index ea64d8f..662fd00 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -695,9 +695,9 @@ statsd_prefix = airflow
# start with the elements of the list (e.g: scheduler,executor,dagrun)
statsd_allow_list =
-# The scheduler can run multiple threads in parallel to schedule dags.
-# This defines how many threads will run.
-max_threads = 2
+# The scheduler can run multiple processes in parallel to parse dags.
+# This defines how many processes will run.
+parsing_processes = 2
authenticate = False
# Turn off scheduler use of cron intervals by setting this to False.
diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg
index 3ac2225..30a82a4 100644
--- a/airflow/config_templates/default_test.cfg
+++ b/airflow/config_templates/default_test.cfg
@@ -113,7 +113,7 @@ job_heartbeat_sec = 1
scheduler_heartbeat_sec = 5
scheduler_health_check_threshold = 30
authenticate = true
-max_threads = 2
+parsing_processes = 2
catchup_by_default = True
scheduler_zombie_task_threshold = 300
dag_dir_list_interval = 0
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 290843f..16081a3 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -183,6 +183,9 @@ class AirflowConfigParser(ConfigParser):
'json_format': 'elasticsearch_json_format',
'json_fields': 'elasticsearch_json_fields'
+ },
+ 'scheduler': {
+ 'parsing_processes': 'max_threads'
}
}
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 9149699..b72e2b1 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -391,7 +391,7 @@ class SchedulerJob(BaseJob):
self.do_pickle = do_pickle
super(SchedulerJob, self).__init__(*args, **kwargs)
- self.max_threads = conf.getint('scheduler', 'max_threads')
+ self.max_threads = conf.getint('scheduler', 'parsing_processes')
if log:
self._log = log
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index 4a4b240..881a8ce 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -769,10 +769,10 @@ class DagFileProcessorManager(LoggingMixin):
self._dag_ids = dag_ids
self._async_mode = async_mode
- self._parallelism = conf.getint('scheduler', 'max_threads')
+ self._parallelism = conf.getint('scheduler', 'parsing_processes')
if 'sqlite' in conf.get('core', 'sql_alchemy_conn') and self._parallelism > 1:
self.log.warning(
- "Because we cannot use more than 1 thread (max_threads = {}) "
+ "Because we cannot use more than 1 thread (parsing_processes = {}) "
"when using sqlite. So we set parallelism to 1.".format(self._parallelism)
)
self._parallelism = 1
diff --git a/docs/faq.rst b/docs/faq.rst
index 80849e0..c041e0a 100644
--- a/docs/faq.rst
+++ b/docs/faq.rst
@@ -205,8 +205,11 @@ This means ``explicit_defaults_for_timestamp`` is disabled in your mysql server
How to reduce airflow dag scheduling latency in production?
-----------------------------------------------------------
-- ``max_threads``: Scheduler will spawn multiple threads in parallel to schedule dags. This is controlled by ``max_threads`` with default value of 2. User should increase this value to a larger value (e.g numbers of cpus where scheduler runs - 1) in production.
-- ``scheduler_heartbeat_sec``: User should consider to increase ``scheduler_heartbeat_sec`` config to a higher value (e.g 60 secs) which controls how frequent the airflow scheduler gets the heartbeat and updates the job's entry in database.
+- ``parsing_processes``: Scheduler will spawn multiple threads in parallel to parse dags.
+ This is controlled by ``parsing_processes`` with default value of 2.
+ User should increase this value to a larger value (e.g numbers of cpus where scheduler runs + 1) in production.
+- ``scheduler_heartbeat_sec``: User should consider to increase ``scheduler_heartbeat_sec`` config to a higher value (e.g 60 secs) which controls how frequent the airflow scheduler gets the heartbeat
+ and updates the job's entry in database.
Why next_ds or prev_ds might not contain expected values?
---------------------------------------------------------
diff --git a/scripts/in_container/airflow_ci.cfg b/scripts/in_container/airflow_ci.cfg
index b097752..4933af0 100644
--- a/scripts/in_container/airflow_ci.cfg
+++ b/scripts/in_container/airflow_ci.cfg
@@ -52,4 +52,4 @@ _test_only_string = this is a test
job_heartbeat_sec = 1
scheduler_heartbeat_sec = 5
authenticate = true
-max_threads = 2
+parsing_processes = 2
diff --git a/tests/utils/test_dag_processing.py b/tests/utils/test_dag_processing.py
index 3726a8d..b2bdf35 100644
--- a/tests/utils/test_dag_processing.py
+++ b/tests/utils/test_dag_processing.py
@@ -267,7 +267,7 @@ class TestDagFileProcessorManager(unittest.TestCase):
file processors until the next zombie detection logic is invoked.
"""
test_dag_path = os.path.join(TEST_DAG_FOLDER, 'test_example_bash_operator.py')
- with conf_vars({('scheduler', 'max_threads'): '1',
+ with conf_vars({('scheduler', 'parsing_processes'): '1',
('core', 'load_examples'): 'False'}):
dagbag = DagBag(test_dag_path)
with create_session() as session: