You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by sa...@apache.org on 2017/07/21 20:59:18 UTC
incubator-airflow git commit: [AIRFLOW-1438] Change batch size per
query in scheduler
Repository: incubator-airflow
Updated Branches:
refs/heads/master f1f022c1e -> 3547cbffd
[AIRFLOW-1438] Change batch size per query in scheduler
This should help if query size is limited. It also
reduces how long
locks are held.
Closes #2462 from saguziel/aguziel-paginate-query
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3547cbff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3547cbff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3547cbff
Branch: refs/heads/master
Commit: 3547cbffdbffac2f98a8aa05526e8c9671221025
Parents: f1f022c
Author: Alex Guziel <al...@airbnb.com>
Authored: Fri Jul 21 13:59:14 2017 -0700
Committer: Alex Guziel <al...@airbnb.com>
Committed: Fri Jul 21 13:59:14 2017 -0700
----------------------------------------------------------------------
airflow/config_templates/default_airflow.cfg | 5 +++
airflow/config_templates/default_test.cfg | 1 +
airflow/jobs.py | 45 ++++++++++++++++++-----
tests/jobs.py | 38 +++++++++++++++++++
4 files changed, 80 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3547cbff/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index ddd1ba8..33cee39 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -325,6 +325,11 @@ scheduler_zombie_task_threshold = 300
# DAG definition (catchup)
catchup_by_default = True
+# This changes the batch size of queries in the scheduling main loop.
+# This depends on query length limits and how long you are willing to hold locks.
+# 0 for no limit
+max_tis_per_query = 0
+
# Statsd (https://github.com/etsy/statsd) integration settings
statsd_on = False
statsd_host = localhost
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3547cbff/airflow/config_templates/default_test.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg
index f6650af..88b19a5 100644
--- a/airflow/config_templates/default_test.cfg
+++ b/airflow/config_templates/default_test.cfg
@@ -85,6 +85,7 @@ max_threads = 2
catchup_by_default = True
scheduler_zombie_task_threshold = 300
dag_dir_list_interval = 0
+max_tis_per_query = 0
[admin]
hide_sensitive_variable_fields = True
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3547cbff/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 6b63df0..e2f8c94 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -595,6 +595,7 @@ class SchedulerJob(BaseJob):
# Directory where log files for the processes that scheduled the DAGs reside
self.child_process_log_directory = conf.get('scheduler',
'child_process_log_directory')
+ self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
if run_duration is None:
self.run_duration = conf.getint('scheduler',
'run_duration')
@@ -1146,6 +1147,15 @@ class SchedulerJob(BaseJob):
["{}".format(x) for x in executable_tis])
self.logger.info("Setting the follow tasks to queued state:\n\t{}"
.format(task_instance_str))
+ # so these dont expire on commit
+ for ti in executable_tis:
+ copy_dag_id = ti.dag_id
+ copy_execution_date = ti.execution_date
+ copy_task_id = ti.task_id
+ make_transient(ti)
+ ti.dag_id = copy_dag_id
+ ti.execution_date = copy_execution_date
+ ti.task_id = copy_task_id
return executable_tis
@provide_session
@@ -1289,15 +1299,32 @@ class SchedulerJob(BaseJob):
"""
executable_tis = self._find_executable_task_instances(simple_dag_bag, states,
session=session)
- tis_with_state_changed = self._change_state_for_executable_task_instances(
- executable_tis,
- states,
- session=session)
- self._enqueue_task_instances_with_queued_state(
- simple_dag_bag,
- tis_with_state_changed)
- session.commit()
- return len(tis_with_state_changed)
+ if self.max_tis_per_query == 0:
+ tis_with_state_changed = self._change_state_for_executable_task_instances(
+ executable_tis,
+ states,
+ session=session)
+ self._enqueue_task_instances_with_queued_state(
+ simple_dag_bag,
+ tis_with_state_changed)
+ session.commit()
+ return len(tis_with_state_changed)
+ else:
+ # makes chunks of max_tis_per_query size
+ chunks = ([executable_tis[i:i + self.max_tis_per_query]
+ for i in range(0, len(executable_tis), self.max_tis_per_query)])
+ total_tis_queued = 0
+ for chunk in chunks:
+ tis_with_state_changed = self._change_state_for_executable_task_instances(
+ chunk,
+ states,
+ session=session)
+ self._enqueue_task_instances_with_queued_state(
+ simple_dag_bag,
+ tis_with_state_changed)
+ session.commit()
+ total_tis_queued += len(tis_with_state_changed)
+ return total_tis_queued
def _process_dags(self, dagbag, dags, tis_out):
"""
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3547cbff/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index e987e0c..c9ab742 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -1037,6 +1037,44 @@ class SchedulerJobTest(unittest.TestCase):
six.assertCountEqual(self, [State.QUEUED, State.SCHEDULED], [ti3.state, ti4.state])
self.assertEqual(1, res)
+ def test_execute_task_instances_limit(self):
+ dag_id = 'SchedulerJobTest.test_execute_task_instances_limit'
+ task_id_1 = 'dummy_task'
+ task_id_2 = 'dummy_task_2'
+ # important that len(tasks) is less than concurrency
+ # because before scheduler._execute_task_instances would only
+ # check the num tasks once so if concurrency was 3,
+ # we could execute arbitrarily many tasks in the second run
+ dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16)
+ task1 = DummyOperator(dag=dag, task_id=task_id_1)
+ task2 = DummyOperator(dag=dag, task_id=task_id_2)
+ dagbag = SimpleDagBag([dag])
+
+ scheduler = SchedulerJob(**self.default_scheduler_args)
+ scheduler.max_tis_per_query = 3
+ session = settings.Session()
+
+ tis = []
+ for i in range(0, 4):
+ dr = scheduler.create_dag_run(dag)
+ ti1 = TI(task1, dr.execution_date)
+ ti2 = TI(task2, dr.execution_date)
+ tis.append(ti1)
+ tis.append(ti2)
+ ti1.refresh_from_db()
+ ti2.refresh_from_db()
+ ti1.state = State.SCHEDULED
+ ti2.state = State.SCHEDULED
+ session.merge(ti1)
+ session.merge(ti2)
+ session.commit()
+ res = scheduler._execute_task_instances(dagbag, [State.SCHEDULED])
+
+ self.assertEqual(8, res)
+ for ti in tis:
+ ti.refresh_from_db()
+ self.assertEqual(State.QUEUED, ti.state)
+
def test_change_state_for_tis_without_dagrun(self):
dag = DAG(
dag_id='test_change_state_for_tis_without_dagrun',