You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/09/11 22:01:38 UTC

[GitHub] r39132 closed pull request #1906: [AIRFLOW-536] Schedule all pending DAG runs in a single scheduler loop

r39132 closed pull request #1906: [AIRFLOW-536] Schedule all pending DAG runs in a single scheduler loop
URL: https://github.com/apache/incubator-airflow/pull/1906
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py
index 7a4065eb07..8b29b690dc 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -104,10 +104,11 @@ def heartbeat(self):
         self.logger.debug("{} in queue".format(len(self.queued_tasks)))
         self.logger.debug("{} open slots".format(open_slots))
 
+        # sort by priority (descending) and execution date (ascending)
         sorted_queue = sorted(
             [(k, v) for k, v in self.queued_tasks.items()],
-            key=lambda x: x[1][1],
-            reverse=True)
+            key=lambda x: (-x[1][1], x[0][2]),
+            reverse=False)
         for i in range(min((open_slots, len(self.queued_tasks)))):
             key, (command, _, queue, ti) = sorted_queue.pop(0)
             # TODO(jlowin) without a way to know what Job ran which tasks,
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 8bb93bbc66..e60377706b 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1133,9 +1133,13 @@ def _process_dags(self, dagbag, dags, tis_out):
 
             self.logger.info("Processing {}".format(dag.dag_id))
 
-            dag_run = self.create_dag_run(dag)
-            if dag_run:
-                self.logger.info("Created {}".format(dag_run))
+            # create all valid DAG runs in a single scheduler invocation
+            while True:
+                dag_run = self.create_dag_run(dag)
+                if dag_run:
+                    self.logger.info("Created DAG run {}".format(dag_run))
+                else:
+                    break
             self._process_task_instances(dag, tis_out)
             self.manage_slas(dag)
 
diff --git a/scripts/perf/scheduler_ops_metrics.py b/scripts/perf/scheduler_ops_metrics.py
index 40e1b362e8..bab9ad81c5 100644
--- a/scripts/perf/scheduler_ops_metrics.py
+++ b/scripts/perf/scheduler_ops_metrics.py
@@ -24,7 +24,7 @@
 
 SUBDIR = 'scripts/perf/dags'
 DAG_IDS = ['perf_dag_1', 'perf_dag_2']
-MAX_RUNTIME_SECS = 6
+MAX_RUNTIME_SECS = 600
 
 
 class SchedulerMetricsJob(SchedulerJob):
@@ -179,7 +179,8 @@ def main():
     clear_dag_runs()
     clear_dag_task_instances()
 
-    job = SchedulerMetricsJob(dag_ids=DAG_IDS, subdir=SUBDIR)
+    job = SchedulerMetricsJob(dag_ids=DAG_IDS, subdir=SUBDIR,
+        processor_poll_interval=1.0)
     job.run()
 
 
diff --git a/tests/jobs.py b/tests/jobs.py
index 53626ee908..0c7d5ce9b8 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -37,6 +37,7 @@
 from airflow.utils.dag_processing import SimpleDagBag
 from mock import patch
 from tests.executor.test_executor import TestExecutor
+from tests.test_utils.fake_datetime import FakeDatetime
 
 from airflow import configuration
 configuration.load_test_config()
@@ -851,6 +852,7 @@ def test_scheduler_auto_align(self):
         self.assertIsNotNone(dr)
         self.assertEquals(dr.execution_date, datetime.datetime(2016, 1, 1, 10, 10))
 
+    @mock.patch('airflow.jobs.datetime', FakeDatetime)
     def test_scheduler_reschedule(self):
         """
         Checks if tasks that are not taken up by the executor
@@ -894,10 +896,14 @@ def do_schedule(function, function2):
             scheduler.heartrate = 0
             scheduler.run()
 
+        FakeDatetime.now = classmethod(
+            lambda cls: DEFAULT_DATE+datetime.timedelta(days=1))
         do_schedule()
         self.assertEquals(1, len(executor.queued_tasks))
         executor.queued_tasks.clear()
 
+        FakeDatetime.now = classmethod(
+            lambda cls: DEFAULT_DATE+datetime.timedelta(days=2))
         do_schedule()
         self.assertEquals(2, len(executor.queued_tasks))
 
@@ -1027,11 +1033,15 @@ def test_scheduler_run_duration(self):
                      expected_run_duration)
         assert run_duration - expected_run_duration < 5.0
 
+    @mock.patch('airflow.jobs.datetime', FakeDatetime)
     def test_dag_with_system_exit(self):
         """
-        Test to check that a DAG with a system.exit() doesn't break the scheduler.
+        Test to check that a DAG with a system.exit() doesn't break the
+        scheduler.
         """
 
+        FakeDatetime.now = classmethod(
+            lambda cls: datetime.datetime(2000, 1, 2))
         dag_id = 'exit_test_dag'
         dag_ids = [dag_id]
         dag_directory = os.path.join(models.DAGS_FOLDER,
@@ -1209,3 +1219,29 @@ def test_dag_catchup_option(self):
 
         # The DR should be scheduled BEFORE now
         self.assertLess(dr.execution_date, datetime.datetime.now())
+
+    @mock.patch('airflow.jobs.datetime', FakeDatetime)
+    def test_schedule_multiple_dag_runs(self):
+        """
+        Test to check that multiple valid DAG runs are scheduled in a single
+        scheduler loop.
+        """
+        # The start date for test_start_date_scheduling is 2100-1-1. If we mock
+        # datetime.now() as 2100-1-8, DAG runs for 2100-1-1 to 2100-1-7 should
+        # get scheduled in a single scheduler loop
+        FakeDatetime.now = classmethod(
+            lambda cls: datetime.datetime(2100, 1, 8))
+        dag_id = 'test_start_date_scheduling'
+        dag_ids = [dag_id]
+        dag_directory = os.path.join(models.DAGS_FOLDER,
+                                     "..",
+                                     "dags")
+
+        scheduler = SchedulerJob(dag_ids=dag_ids,
+                                 subdir= dag_directory,
+                                 num_runs=1,
+                                 **self.default_scheduler_args)
+        scheduler.run()
+        session = settings.Session()
+        self.assertEqual(
+            len(session.query(TI).filter(TI.dag_id == dag_id).all()), 7)
diff --git a/tests/test_utils/fake_datetime.py b/tests/test_utils/fake_datetime.py
index 9b8102f38f..a2856f6f72 100644
--- a/tests/test_utils/fake_datetime.py
+++ b/tests/test_utils/fake_datetime.py
@@ -21,4 +21,4 @@ class FakeDatetime(datetime):
     """
 
     def __new__(cls, *args, **kwargs):
-        return date.__new__(datetime, *args, **kwargs)
+        return datetime.__new__(datetime, *args, **kwargs)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services