You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/08/31 09:09:46 UTC
[airflow] branch main updated: Fix schedule_interval in decorated dags (#26082)
This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c982080ca1 Fix schedule_interval in decorated dags (#26082)
c982080ca1 is described below
commit c982080ca1c824dd26c452bcb420df0f3da1afa8
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Wed Aug 31 10:09:21 2022 +0100
Fix schedule_interval in decorated dags (#26082)
Using schedule_interval in decorated dag does not work at the moment. The
issue is that the schedule arg on the dag decorator has a default of None thus when we
have schedule_interval set and not schedule arg, the schedule arg resets the schedule_interval.
The fix was to make the schedule arg same with what's in DAG __init__ as instructed
---
airflow/models/dag.py | 2 +-
tests/models/test_dag.py | 39 ++++++++++++++++++++++++++++++++++++++-
2 files changed, 39 insertions(+), 2 deletions(-)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 825c721032..ee14a4e4af 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -3301,7 +3301,7 @@ def dag(
jinja_environment_kwargs: Optional[Dict] = None,
render_template_as_native_obj: bool = False,
tags: Optional[List[str]] = None,
- schedule: Optional[ScheduleArg] = None,
+ schedule: ScheduleArg = NOTSET,
owner_links: Optional[Dict[str, str]] = None,
) -> Callable[[Callable], Callable[..., DAG]]:
"""
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 05a97dfc4d..173a08fb70 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -609,7 +609,7 @@ class TestDag:
def test_following_schedule_relativedelta(self):
"""
- Tests following_schedule a dag with a relativedelta schedule_interval
+ Tests following_schedule a dag with a relativedelta schedule
"""
dag_id = "test_schedule_dag_relativedelta"
delta = relativedelta(hours=+1)
@@ -622,6 +622,43 @@ class TestDag:
_next = dag.following_schedule(_next)
assert _next.isoformat() == "2015-01-02T02:00:00+00:00"
+ def test_following_schedule_relativedelta_with_deprecated_schedule_interval(self):
+ """
+ Tests following_schedule a dag with a relativedelta schedule_interval
+ """
+ dag_id = "test_schedule_dag_relativedelta"
+ delta = relativedelta(hours=+1)
+ dag = DAG(dag_id=dag_id, schedule_interval=delta)
+ dag.add_task(BaseOperator(task_id="faketastic", owner='Also fake', start_date=TEST_DATE))
+
+ _next = dag.following_schedule(TEST_DATE)
+ assert _next.isoformat() == "2015-01-02T01:00:00+00:00"
+
+ _next = dag.following_schedule(_next)
+ assert _next.isoformat() == "2015-01-02T02:00:00+00:00"
+
+ def test_following_schedule_relativedelta_with_depr_schedule_interval_decorated_dag(self):
+ """
+ Tests following_schedule a dag with a relativedelta schedule_interval
+ using decorated dag
+ """
+ from airflow.decorators import dag
+
+ dag_id = "test_schedule_dag_relativedelta"
+ delta = relativedelta(hours=+1)
+
+ @dag(dag_id=dag_id, schedule_interval=delta)
+ def mydag():
+ BaseOperator(task_id="faketastic", owner='Also fake', start_date=TEST_DATE)
+
+ _dag = mydag()
+
+ _next = _dag.following_schedule(TEST_DATE)
+ assert _next.isoformat() == "2015-01-02T01:00:00+00:00"
+
+ _next = _dag.following_schedule(_next)
+ assert _next.isoformat() == "2015-01-02T02:00:00+00:00"
+
def test_previous_schedule_datetime_timezone(self):
# Check that we don't get an AttributeError 'name' for self.timezone