You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pi...@apache.org on 2023/01/11 22:53:36 UTC

[airflow] branch v2-5-test updated: Fix calendar view for CronTriggerTimeTable dags (#28411)

This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-5-test by this push:
     new e588322f90 Fix calendar view for CronTriggerTimeTable dags (#28411)
e588322f90 is described below

commit e588322f90ca9d35b688536616d3b2c16a640637
Author: Brent Bovenzi <br...@astronomer.io>
AuthorDate: Wed Dec 28 00:52:54 2022 -0500

    Fix calendar view for CronTriggerTimeTable dags (#28411)
    
    Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
    (cherry picked from commit 467a5e3ab287013db2a5381ef4a642e912f8b45b)
---
 airflow/www/views.py | 49 +++++++++++++++++++++++++++----------------------
 1 file changed, 27 insertions(+), 22 deletions(-)

diff --git a/airflow/www/views.py b/airflow/www/views.py
index c645a56b58..14a57b5b4f 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -33,7 +33,7 @@ from datetime import datetime, timedelta
 from functools import wraps
 from json import JSONDecodeError
 from operator import itemgetter
-from typing import Any, Callable, Collection
+from typing import Any, Callable, Collection, Iterator
 from urllib.parse import unquote, urljoin, urlsplit
 
 import configupdater
@@ -107,8 +107,8 @@ from airflow.providers_manager import ProvidersManager
 from airflow.security import permissions
 from airflow.ti_deps.dep_context import DepContext
 from airflow.ti_deps.dependencies_deps import RUNNING_DEPS, SCHEDULER_QUEUED_DEPS
+from airflow.timetables._cron import CronMixin
 from airflow.timetables.base import DataInterval, TimeRestriction
-from airflow.timetables.interval import CronDataIntervalTimetable
 from airflow.utils import json as utils_json, timezone, yaml
 from airflow.utils.airflow_flask_app import get_airflow_app
 from airflow.utils.dag_edges import dag_edges
@@ -979,7 +979,6 @@ class Airflow(AirflowBaseView):
         )
 
         if conf.getboolean("webserver", "SHOW_RECENT_STATS_FOR_COMPLETED_RUNS", fallback=True):
-
             last_dag_run = (
                 session.query(DagRun.dag_id, sqla.func.max(DagRun.execution_date).label("execution_date"))
                 .join(DagModel, DagModel.dag_id == DagRun.dag_id)
@@ -2730,28 +2729,37 @@ class Airflow(AirflowBaseView):
             restriction = TimeRestriction(dag.start_date, dag.end_date, False)
             dates = collections.Counter()
 
-            if isinstance(dag.timetable, CronDataIntervalTimetable):
-                for next in croniter(
-                    dag.timetable.summary, start_time=last_automated_data_interval.end, ret_type=datetime
-                ):
-                    if next is None:
+            if isinstance(dag.timetable, CronMixin):
+                # Optimized calendar generation for timetables based on a cron expression.
+                dates_iter: Iterator[datetime | None] = croniter(
+                    dag.timetable._expression,
+                    start_time=last_automated_data_interval.end,
+                    ret_type=datetime,
+                )
+                for dt in dates_iter:
+                    if dt is None:
                         break
-                    if next.year != year:
+                    if dt.year != year:
                         break
-                    if dag.end_date and next > dag.end_date:
+                    if dag.end_date and dt > dag.end_date:
                         break
-                    dates[next.date()] += 1
+                    dates[dt.date()] += 1
             else:
+                prev_logical_date = datetime.min
                 while True:
-                    info = dag.timetable.next_dagrun_info(
-                        last_automated_data_interval=last_automated_data_interval, restriction=restriction
+                    curr_info = dag.timetable.next_dagrun_info(
+                        last_automated_data_interval=last_automated_data_interval,
+                        restriction=restriction,
                     )
-                    if info is None:
-                        break
-                    if info.logical_date.year != year:
-                        break
-                    last_automated_data_interval = info.data_interval
-                    dates[info.logical_date] += 1
+                    if curr_info is None:
+                        break  # Reached the end.
+                    if curr_info.logical_date <= prev_logical_date:
+                        break  # We're not progressing. Maybe a malformed timetable? Give up.
+                    if curr_info.logical_date.year != year:
+                        break  # Crossed the year boundary.
+                    last_automated_data_interval = curr_info.data_interval
+                    dates[curr_info.logical_date] += 1
+                    prev_logical_date = curr_info.logical_date
 
             data_dag_states.extend(
                 {"date": date.isoformat(), "state": "planned", "count": count}
@@ -3845,7 +3853,6 @@ class ConfigurationView(AirflowBaseView):
                 for key, (value, source) in parameters.items()
             ]
         elif expose_config.lower() in ["true", "t", "1"]:
-
             with open(AIRFLOW_CONFIG) as file:
                 config = file.read()
             table = [
@@ -4345,7 +4352,6 @@ class ConnectionModelView(AirflowModelView):
                     "warning",
                 )
             else:
-
                 dup_conn = Connection(
                     new_conn_id,
                     selected_conn.conn_type,
@@ -5557,7 +5563,6 @@ class DagDependenciesView(AirflowBaseView):
         )
 
     def _calculate_graph(self):
-
         nodes_dict: dict[str, Any] = {}
         edge_tuples: set[dict[str, str]] = set()