You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ur...@apache.org on 2022/12/28 05:53:06 UTC

[airflow] branch main updated: Fix calendar view for CronTriggerTimeTable dags (#28411)

This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 467a5e3ab2 Fix calendar view for CronTriggerTimeTable dags (#28411)
467a5e3ab2 is described below

commit 467a5e3ab287013db2a5381ef4a642e912f8b45b
Author: Brent Bovenzi <br...@astronomer.io>
AuthorDate: Wed Dec 28 00:52:54 2022 -0500

    Fix calendar view for CronTriggerTimeTable dags (#28411)
    
    Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
---
 airflow/www/views.py | 45 +++++++++++++++++++++++++++------------------
 1 file changed, 27 insertions(+), 18 deletions(-)

diff --git a/airflow/www/views.py b/airflow/www/views.py
index 549db3a291..b262876ac5 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -33,7 +33,7 @@ from datetime import datetime, timedelta
 from functools import wraps
 from json import JSONDecodeError
 from operator import itemgetter
-from typing import Any, Callable, Collection, Mapping, MutableMapping, Sequence
+from typing import Any, Callable, Collection, Iterator, Mapping, MutableMapping, Sequence
 from urllib.parse import unquote, urljoin, urlsplit
 
 import configupdater
@@ -107,8 +107,8 @@ from airflow.providers_manager import ProvidersManager
 from airflow.security import permissions
 from airflow.ti_deps.dep_context import DepContext
 from airflow.ti_deps.dependencies_deps import RUNNING_DEPS, SCHEDULER_QUEUED_DEPS
+from airflow.timetables._cron import CronMixin
 from airflow.timetables.base import DataInterval, TimeRestriction
-from airflow.timetables.interval import CronDataIntervalTimetable
 from airflow.utils import json as utils_json, timezone, yaml
 from airflow.utils.airflow_flask_app import get_airflow_app
 from airflow.utils.dag_edges import dag_edges
@@ -2828,28 +2828,37 @@ class Airflow(AirflowBaseView):
             restriction = TimeRestriction(dag.start_date, dag.end_date, False)
             dates = collections.Counter()
 
-            if isinstance(dag.timetable, CronDataIntervalTimetable):
-                for next in croniter(
-                    dag.timetable.summary, start_time=last_automated_data_interval.end, ret_type=datetime
-                ):
-                    if next is None:
+            if isinstance(dag.timetable, CronMixin):
+                # Optimized calendar generation for timetables based on a cron expression.
+                dates_iter: Iterator[datetime | None] = croniter(
+                    dag.timetable._expression,
+                    start_time=last_automated_data_interval.end,
+                    ret_type=datetime,
+                )
+                for dt in dates_iter:
+                    if dt is None:
                         break
-                    if next.year != year:
+                    if dt.year != year:
                         break
-                    if dag.end_date and next > dag.end_date:
+                    if dag.end_date and dt > dag.end_date:
                         break
-                    dates[next.date()] += 1
+                    dates[dt.date()] += 1
             else:
+                prev_logical_date = datetime.min
                 while True:
-                    info = dag.timetable.next_dagrun_info(
-                        last_automated_data_interval=last_automated_data_interval, restriction=restriction
+                    curr_info = dag.timetable.next_dagrun_info(
+                        last_automated_data_interval=last_automated_data_interval,
+                        restriction=restriction,
                     )
-                    if info is None:
-                        break
-                    if info.logical_date.year != year:
-                        break
-                    last_automated_data_interval = info.data_interval
-                    dates[info.logical_date] += 1
+                    if curr_info is None:
+                        break  # Reached the end.
+                    if curr_info.logical_date <= prev_logical_date:
+                        break  # We're not progressing. Maybe a malformed timetable? Give up.
+                    if curr_info.logical_date.year != year:
+                        break  # Crossed the year boundary.
+                    last_automated_data_interval = curr_info.data_interval
+                    dates[curr_info.logical_date] += 1
+                    prev_logical_date = curr_info.logical_date
 
             data_dag_states.extend(
                 {"date": date.isoformat(), "state": "planned", "count": count}