You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ur...@apache.org on 2022/12/28 05:53:06 UTC
[airflow] branch main updated: Fix calendar view for CronTriggerTimeTable dags (#28411)
This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 467a5e3ab2 Fix calendar view for CronTriggerTimeTable dags (#28411)
467a5e3ab2 is described below
commit 467a5e3ab287013db2a5381ef4a642e912f8b45b
Author: Brent Bovenzi <br...@astronomer.io>
AuthorDate: Wed Dec 28 00:52:54 2022 -0500
Fix calendar view for CronTriggerTimeTable dags (#28411)
Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
---
airflow/www/views.py | 45 +++++++++++++++++++++++++++------------------
1 file changed, 27 insertions(+), 18 deletions(-)
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 549db3a291..b262876ac5 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -33,7 +33,7 @@ from datetime import datetime, timedelta
from functools import wraps
from json import JSONDecodeError
from operator import itemgetter
-from typing import Any, Callable, Collection, Mapping, MutableMapping, Sequence
+from typing import Any, Callable, Collection, Iterator, Mapping, MutableMapping, Sequence
from urllib.parse import unquote, urljoin, urlsplit
import configupdater
@@ -107,8 +107,8 @@ from airflow.providers_manager import ProvidersManager
from airflow.security import permissions
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import RUNNING_DEPS, SCHEDULER_QUEUED_DEPS
+from airflow.timetables._cron import CronMixin
from airflow.timetables.base import DataInterval, TimeRestriction
-from airflow.timetables.interval import CronDataIntervalTimetable
from airflow.utils import json as utils_json, timezone, yaml
from airflow.utils.airflow_flask_app import get_airflow_app
from airflow.utils.dag_edges import dag_edges
@@ -2828,28 +2828,37 @@ class Airflow(AirflowBaseView):
restriction = TimeRestriction(dag.start_date, dag.end_date, False)
dates = collections.Counter()
- if isinstance(dag.timetable, CronDataIntervalTimetable):
- for next in croniter(
- dag.timetable.summary, start_time=last_automated_data_interval.end, ret_type=datetime
- ):
- if next is None:
+ if isinstance(dag.timetable, CronMixin):
+ # Optimized calendar generation for timetables based on a cron expression.
+ dates_iter: Iterator[datetime | None] = croniter(
+ dag.timetable._expression,
+ start_time=last_automated_data_interval.end,
+ ret_type=datetime,
+ )
+ for dt in dates_iter:
+ if dt is None:
break
- if next.year != year:
+ if dt.year != year:
break
- if dag.end_date and next > dag.end_date:
+ if dag.end_date and dt > dag.end_date:
break
- dates[next.date()] += 1
+ dates[dt.date()] += 1
else:
+ prev_logical_date = datetime.min
while True:
- info = dag.timetable.next_dagrun_info(
- last_automated_data_interval=last_automated_data_interval, restriction=restriction
+ curr_info = dag.timetable.next_dagrun_info(
+ last_automated_data_interval=last_automated_data_interval,
+ restriction=restriction,
)
- if info is None:
- break
- if info.logical_date.year != year:
- break
- last_automated_data_interval = info.data_interval
- dates[info.logical_date] += 1
+ if curr_info is None:
+ break # Reached the end.
+ if curr_info.logical_date <= prev_logical_date:
+ break # We're not progressing. Maybe a malformed timetable? Give up.
+ if curr_info.logical_date.year != year:
+ break # Crossed the year boundary.
+ last_automated_data_interval = curr_info.data_interval
+ dates[curr_info.logical_date] += 1
+ prev_logical_date = curr_info.logical_date
data_dag_states.extend(
{"date": date.isoformat(), "state": "planned", "count": count}