You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/06/30 13:33:06 UTC

[airflow] branch v1-10-test updated: [AIRFLOW-1156] BugFix: Unpausing a DAG with catchup=False creates an extra DAG run (#8776)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-test by this push:
     new 98443d2  [AIRFLOW-1156] BugFix: Unpausing a DAG with catchup=False creates an extra DAG run (#8776)
98443d2 is described below

commit 98443d25e47b14f2e25e6dc865592943ad339c99
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Mon May 11 22:25:45 2020 +0100

    [AIRFLOW-1156] BugFix: Unpausing a DAG with catchup=False creates an extra DAG run (#8776)
    
    (cherry picked from commit 3ad4f96bae78f16a2240567f65831ca269672d7b)
---
 airflow/jobs/scheduler_job.py    |  2 +-
 tests/jobs/test_scheduler_job.py | 55 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 56 insertions(+), 1 deletion(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 5b06be8..f290c57 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -668,7 +668,7 @@ class SchedulerJob(BaseJob):
                 now = timezone.utcnow()
                 next_start = dag.following_schedule(now)
                 last_start = dag.previous_schedule(now)
-                if next_start <= now:
+                if next_start <= now or isinstance(dag.schedule_interval, timedelta):
                     new_start = last_start
                 else:
                     new_start = dag.previous_schedule(last_start)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index f756e93..181d193 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -34,6 +34,8 @@ from tempfile import mkdtemp
 import psutil
 import pytest
 import six
+from freezegun import freeze_time
+
 from airflow.models.taskinstance import TaskInstance
 from parameterized import parameterized
 
@@ -84,6 +86,13 @@ class SchedulerJobTest(unittest.TestCase):
         # enqueue!
         self.null_exec = MockExecutor()
 
+    def tearDown(self):
+        clear_db_runs()
+        clear_db_pools()
+        clear_db_dags()
+        clear_db_sla_miss()
+        clear_db_errors()
+
     def create_test_dag(self, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE + timedelta(hours=1), **kwargs):
         dag = DAG(
             dag_id='test_scheduler_reschedule',
@@ -1385,6 +1394,52 @@ class SchedulerJobTest(unittest.TestCase):
         dr = scheduler.create_dag_run(dag)
         self.assertIsNone(dr)
 
+    @freeze_time(timezone.datetime(2020, 1, 5))
+    def test_scheduler_dagrun_with_timedelta_schedule_and_catchup_false(self):
+        """
+        Test that the dag file processor does not create multiple dagruns
+        if a dag is scheduled with 'timedelta' and catchup=False
+        """
+        dag = DAG(
+            'test_scheduler_dagrun_once_with_timedelta_and_catchup_false',
+            start_date=timezone.datetime(2015, 1, 1),
+            schedule_interval=timedelta(days=1),
+            catchup=False)
+
+        scheduler_job = SchedulerJob()
+        dag.clear()
+        dr = scheduler_job.create_dag_run(dag)
+        self.assertIsNotNone(dr)
+        self.assertEqual(dr.execution_date, timezone.datetime(2020, 1, 4))
+        dr = scheduler_job.create_dag_run(dag)
+        self.assertIsNone(dr)
+
+    @freeze_time(timezone.datetime(2020, 5, 4))
+    def test_scheduler_dagrun_with_timedelta_schedule_and_catchup_true(self):
+        """
+        Test that the dag file processor creates multiple dagruns
+        if a dag is scheduled with 'timedelta' and catchup=True
+        """
+        dag = DAG(
+            'test_scheduler_dagrun_once_with_timedelta_and_catchup_true',
+            start_date=timezone.datetime(2020, 5, 1),
+            schedule_interval=timedelta(days=1),
+            catchup=True)
+
+        scheduler_job = SchedulerJob()
+        dag.clear()
+        dr = scheduler_job.create_dag_run(dag)
+        self.assertIsNotNone(dr)
+        self.assertEqual(dr.execution_date, timezone.datetime(2020, 5, 1))
+        dr = scheduler_job.create_dag_run(dag)
+        self.assertIsNotNone(dr)
+        self.assertEqual(dr.execution_date, timezone.datetime(2020, 5, 2))
+        dr = scheduler_job.create_dag_run(dag)
+        self.assertIsNotNone(dr)
+        self.assertEqual(dr.execution_date, timezone.datetime(2020, 5, 3))
+        dr = scheduler_job.create_dag_run(dag)
+        self.assertIsNone(dr)
+
     @parameterized.expand([
         [State.NONE, None, None],
         [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),