You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/05/11 21:26:17 UTC
[airflow] branch master updated: [AIRFLOW-1156] BugFix: Unpausing a
DAG with catchup=False creates an extra DAG run (#8776)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 3ad4f96 [AIRFLOW-1156] BugFix: Unpausing a DAG with catchup=False creates an extra DAG run (#8776)
3ad4f96 is described below
commit 3ad4f96bae78f16a2240567f65831ca269672d7b
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Mon May 11 22:25:45 2020 +0100
[AIRFLOW-1156] BugFix: Unpausing a DAG with catchup=False creates an extra DAG run (#8776)
---
airflow/jobs/scheduler_job.py | 2 +-
tests/jobs/test_scheduler_job.py | 54 ++++++++++++++++++++++++++++++++++++++++
2 files changed, 55 insertions(+), 1 deletion(-)
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index d37c72a..fa8a77a 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -583,7 +583,7 @@ class DagFileProcessor(LoggingMixin):
now = timezone.utcnow()
next_start = dag.following_schedule(now)
last_start = dag.previous_schedule(now)
- if next_start <= now:
+ if next_start <= now or isinstance(dag.schedule_interval, timedelta):
new_start = last_start
else:
new_start = dag.previous_schedule(last_start)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index d4ad45c..0db33a3 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -28,6 +28,7 @@ import mock
import psutil
import pytest
import six
+from freezegun import freeze_time
from mock import MagicMock, patch
from parameterized import parameterized
@@ -96,6 +97,13 @@ class TestDagFileProcessor(unittest.TestCase):
# enqueue!
self.null_exec = MockExecutor()
+ def tearDown(self) -> None:
+ clear_db_runs()
+ clear_db_pools()
+ clear_db_dags()
+ clear_db_sla_miss()
+ clear_db_errors()
+
def create_test_dag(self, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE + timedelta(hours=1), **kwargs):
dag = DAG(
dag_id='test_scheduler_reschedule',
@@ -410,6 +418,52 @@ class TestDagFileProcessor(unittest.TestCase):
dr = dag_file_processor.create_dag_run(dag)
self.assertIsNone(dr)
+ @freeze_time(timezone.datetime(2020, 1, 5))
+ def test_dag_file_processor_dagrun_with_timedelta_schedule_and_catchup_false(self):
+ """
+ Test that the dag file processor does not create multiple dagruns
+ if a dag is scheduled with 'timedelta' and catchup=False
+ """
+ dag = DAG(
+ 'test_scheduler_dagrun_once_with_timedelta_and_catchup_false',
+ start_date=timezone.datetime(2015, 1, 1),
+ schedule_interval=timedelta(days=1),
+ catchup=False)
+
+ dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
+ dag.clear()
+ dr = dag_file_processor.create_dag_run(dag)
+ self.assertIsNotNone(dr)
+ self.assertEqual(dr.execution_date, timezone.datetime(2020, 1, 4))
+ dr = dag_file_processor.create_dag_run(dag)
+ self.assertIsNone(dr)
+
+ @freeze_time(timezone.datetime(2020, 5, 4))
+ def test_dag_file_processor_dagrun_with_timedelta_schedule_and_catchup_true(self):
+ """
+ Test that the dag file processor creates multiple dagruns
+ if a dag is scheduled with 'timedelta' and catchup=True
+ """
+ dag = DAG(
+ 'test_scheduler_dagrun_once_with_timedelta_and_catchup_true',
+ start_date=timezone.datetime(2020, 5, 1),
+ schedule_interval=timedelta(days=1),
+ catchup=True)
+
+ dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
+ dag.clear()
+ dr = dag_file_processor.create_dag_run(dag)
+ self.assertIsNotNone(dr)
+ self.assertEqual(dr.execution_date, timezone.datetime(2020, 5, 1))
+ dr = dag_file_processor.create_dag_run(dag)
+ self.assertIsNotNone(dr)
+ self.assertEqual(dr.execution_date, timezone.datetime(2020, 5, 2))
+ dr = dag_file_processor.create_dag_run(dag)
+ self.assertIsNotNone(dr)
+ self.assertEqual(dr.execution_date, timezone.datetime(2020, 5, 3))
+ dr = dag_file_processor.create_dag_run(dag)
+ self.assertIsNone(dr)
+
@parameterized.expand([
[State.NONE, None, None],
[State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),