You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pi...@apache.org on 2023/01/11 22:53:36 UTC
[airflow] branch v2-5-test updated: Fix calendar view for CronTriggerTimeTable dags (#28411)
This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-5-test by this push:
new e588322f90 Fix calendar view for CronTriggerTimeTable dags (#28411)
e588322f90 is described below
commit e588322f90ca9d35b688536616d3b2c16a640637
Author: Brent Bovenzi <br...@astronomer.io>
AuthorDate: Wed Dec 28 00:52:54 2022 -0500
Fix calendar view for CronTriggerTimeTable dags (#28411)
Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
(cherry picked from commit 467a5e3ab287013db2a5381ef4a642e912f8b45b)
---
airflow/www/views.py | 49 +++++++++++++++++++++++++++----------------------
1 file changed, 27 insertions(+), 22 deletions(-)
diff --git a/airflow/www/views.py b/airflow/www/views.py
index c645a56b58..14a57b5b4f 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -33,7 +33,7 @@ from datetime import datetime, timedelta
from functools import wraps
from json import JSONDecodeError
from operator import itemgetter
-from typing import Any, Callable, Collection
+from typing import Any, Callable, Collection, Iterator
from urllib.parse import unquote, urljoin, urlsplit
import configupdater
@@ -107,8 +107,8 @@ from airflow.providers_manager import ProvidersManager
from airflow.security import permissions
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import RUNNING_DEPS, SCHEDULER_QUEUED_DEPS
+from airflow.timetables._cron import CronMixin
from airflow.timetables.base import DataInterval, TimeRestriction
-from airflow.timetables.interval import CronDataIntervalTimetable
from airflow.utils import json as utils_json, timezone, yaml
from airflow.utils.airflow_flask_app import get_airflow_app
from airflow.utils.dag_edges import dag_edges
@@ -979,7 +979,6 @@ class Airflow(AirflowBaseView):
)
if conf.getboolean("webserver", "SHOW_RECENT_STATS_FOR_COMPLETED_RUNS", fallback=True):
-
last_dag_run = (
session.query(DagRun.dag_id, sqla.func.max(DagRun.execution_date).label("execution_date"))
.join(DagModel, DagModel.dag_id == DagRun.dag_id)
@@ -2730,28 +2729,37 @@ class Airflow(AirflowBaseView):
restriction = TimeRestriction(dag.start_date, dag.end_date, False)
dates = collections.Counter()
- if isinstance(dag.timetable, CronDataIntervalTimetable):
- for next in croniter(
- dag.timetable.summary, start_time=last_automated_data_interval.end, ret_type=datetime
- ):
- if next is None:
+ if isinstance(dag.timetable, CronMixin):
+ # Optimized calendar generation for timetables based on a cron expression.
+ dates_iter: Iterator[datetime | None] = croniter(
+ dag.timetable._expression,
+ start_time=last_automated_data_interval.end,
+ ret_type=datetime,
+ )
+ for dt in dates_iter:
+ if dt is None:
break
- if next.year != year:
+ if dt.year != year:
break
- if dag.end_date and next > dag.end_date:
+ if dag.end_date and dt > dag.end_date:
break
- dates[next.date()] += 1
+ dates[dt.date()] += 1
else:
+ prev_logical_date = datetime.min
while True:
- info = dag.timetable.next_dagrun_info(
- last_automated_data_interval=last_automated_data_interval, restriction=restriction
+ curr_info = dag.timetable.next_dagrun_info(
+ last_automated_data_interval=last_automated_data_interval,
+ restriction=restriction,
)
- if info is None:
- break
- if info.logical_date.year != year:
- break
- last_automated_data_interval = info.data_interval
- dates[info.logical_date] += 1
+ if curr_info is None:
+ break # Reached the end.
+ if curr_info.logical_date <= prev_logical_date:
+ break # We're not progressing. Maybe a malformed timetable? Give up.
+ if curr_info.logical_date.year != year:
+ break # Crossed the year boundary.
+ last_automated_data_interval = curr_info.data_interval
+ dates[curr_info.logical_date] += 1
+ prev_logical_date = curr_info.logical_date
data_dag_states.extend(
{"date": date.isoformat(), "state": "planned", "count": count}
@@ -3845,7 +3853,6 @@ class ConfigurationView(AirflowBaseView):
for key, (value, source) in parameters.items()
]
elif expose_config.lower() in ["true", "t", "1"]:
-
with open(AIRFLOW_CONFIG) as file:
config = file.read()
table = [
@@ -4345,7 +4352,6 @@ class ConnectionModelView(AirflowModelView):
"warning",
)
else:
-
dup_conn = Connection(
new_conn_id,
selected_conn.conn_type,
@@ -5557,7 +5563,6 @@ class DagDependenciesView(AirflowBaseView):
)
def _calculate_graph(self):
-
nodes_dict: dict[str, Any] = {}
edge_tuples: set[dict[str, str]] = set()