You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by sa...@apache.org on 2017/07/21 20:59:18 UTC

incubator-airflow git commit: [AIRFLOW-1438] Change batch size per query in scheduler

Repository: incubator-airflow
Updated Branches:
  refs/heads/master f1f022c1e -> 3547cbffd


[AIRFLOW-1438] Change batch size per query in scheduler

This should help if query size is limited. It also
reduces how long
locks are held.

Closes #2462 from saguziel/aguziel-paginate-query


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3547cbff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3547cbff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3547cbff

Branch: refs/heads/master
Commit: 3547cbffdbffac2f98a8aa05526e8c9671221025
Parents: f1f022c
Author: Alex Guziel <al...@airbnb.com>
Authored: Fri Jul 21 13:59:14 2017 -0700
Committer: Alex Guziel <al...@airbnb.com>
Committed: Fri Jul 21 13:59:14 2017 -0700

----------------------------------------------------------------------
 airflow/config_templates/default_airflow.cfg |  5 +++
 airflow/config_templates/default_test.cfg    |  1 +
 airflow/jobs.py                              | 45 ++++++++++++++++++-----
 tests/jobs.py                                | 38 +++++++++++++++++++
 4 files changed, 80 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3547cbff/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index ddd1ba8..33cee39 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -325,6 +325,11 @@ scheduler_zombie_task_threshold = 300
 # DAG definition (catchup)
 catchup_by_default = True
 
+# This changes the batch size of queries in the scheduling main loop.
+# This depends on query length limits and how long you are willing to hold locks.
+# 0 for no limit
+max_tis_per_query = 0
+
 # Statsd (https://github.com/etsy/statsd) integration settings
 statsd_on = False
 statsd_host = localhost

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3547cbff/airflow/config_templates/default_test.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg
index f6650af..88b19a5 100644
--- a/airflow/config_templates/default_test.cfg
+++ b/airflow/config_templates/default_test.cfg
@@ -85,6 +85,7 @@ max_threads = 2
 catchup_by_default = True
 scheduler_zombie_task_threshold = 300
 dag_dir_list_interval = 0
+max_tis_per_query = 0
 
 [admin]
 hide_sensitive_variable_fields = True

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3547cbff/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 6b63df0..e2f8c94 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -595,6 +595,7 @@ class SchedulerJob(BaseJob):
         # Directory where log files for the processes that scheduled the DAGs reside
         self.child_process_log_directory = conf.get('scheduler',
                                                     'child_process_log_directory')
+        self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
         if run_duration is None:
             self.run_duration = conf.getint('scheduler',
                                             'run_duration')
@@ -1146,6 +1147,15 @@ class SchedulerJob(BaseJob):
             ["{}".format(x) for x in executable_tis])
         self.logger.info("Setting the follow tasks to queued state:\n\t{}"
                          .format(task_instance_str))
+        # so these dont expire on commit
+        for ti in executable_tis:
+            copy_dag_id = ti.dag_id
+            copy_execution_date = ti.execution_date
+            copy_task_id = ti.task_id
+            make_transient(ti)
+            ti.dag_id = copy_dag_id
+            ti.execution_date = copy_execution_date
+            ti.task_id = copy_task_id
         return executable_tis
 
     @provide_session
@@ -1289,15 +1299,32 @@ class SchedulerJob(BaseJob):
         """
         executable_tis = self._find_executable_task_instances(simple_dag_bag, states,
                                                               session=session)
-        tis_with_state_changed = self._change_state_for_executable_task_instances(
-            executable_tis,
-            states,
-            session=session)
-        self._enqueue_task_instances_with_queued_state(
-            simple_dag_bag,
-            tis_with_state_changed)
-        session.commit()
-        return len(tis_with_state_changed)
+        if self.max_tis_per_query == 0:
+            tis_with_state_changed = self._change_state_for_executable_task_instances(
+                executable_tis,
+                states,
+                session=session)
+            self._enqueue_task_instances_with_queued_state(
+                simple_dag_bag,
+                tis_with_state_changed)
+            session.commit()
+            return len(tis_with_state_changed)
+        else:
+            # makes chunks of max_tis_per_query size
+            chunks = ([executable_tis[i:i + self.max_tis_per_query]
+                      for i in range(0, len(executable_tis), self.max_tis_per_query)])
+            total_tis_queued = 0
+            for chunk in chunks:
+                tis_with_state_changed = self._change_state_for_executable_task_instances(
+                    chunk,
+                    states,
+                    session=session)
+                self._enqueue_task_instances_with_queued_state(
+                    simple_dag_bag,
+                    tis_with_state_changed)
+                session.commit()
+                total_tis_queued += len(tis_with_state_changed)
+            return total_tis_queued
 
     def _process_dags(self, dagbag, dags, tis_out):
         """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3547cbff/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index e987e0c..c9ab742 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -1037,6 +1037,44 @@ class SchedulerJobTest(unittest.TestCase):
         six.assertCountEqual(self, [State.QUEUED, State.SCHEDULED], [ti3.state, ti4.state])
         self.assertEqual(1, res)
 
+    def test_execute_task_instances_limit(self):
+        dag_id = 'SchedulerJobTest.test_execute_task_instances_limit'
+        task_id_1 = 'dummy_task'
+        task_id_2 = 'dummy_task_2'
+        # important that len(tasks) is less than concurrency
+        # because before scheduler._execute_task_instances would only
+        # check the num tasks once so if concurrency was 3,
+        # we could execute arbitrarily many tasks in the second run
+        dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16)
+        task1 = DummyOperator(dag=dag, task_id=task_id_1)
+        task2 = DummyOperator(dag=dag, task_id=task_id_2)
+        dagbag = SimpleDagBag([dag])
+
+        scheduler = SchedulerJob(**self.default_scheduler_args)
+        scheduler.max_tis_per_query = 3
+        session = settings.Session()
+
+        tis = []
+        for i in range(0, 4):
+            dr = scheduler.create_dag_run(dag)
+            ti1 = TI(task1, dr.execution_date)
+            ti2 = TI(task2, dr.execution_date)
+            tis.append(ti1)
+            tis.append(ti2)
+            ti1.refresh_from_db()
+            ti2.refresh_from_db()
+            ti1.state = State.SCHEDULED
+            ti2.state = State.SCHEDULED
+            session.merge(ti1)
+            session.merge(ti2)
+            session.commit()
+        res = scheduler._execute_task_instances(dagbag, [State.SCHEDULED])
+
+        self.assertEqual(8, res)
+        for ti in tis:
+            ti.refresh_from_db()
+            self.assertEqual(State.QUEUED, ti.state)
+
     def test_change_state_for_tis_without_dagrun(self):
         dag = DAG(
             dag_id='test_change_state_for_tis_without_dagrun',