You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/06/29 15:20:06 UTC
[airflow] 21/45: Optimize calendar view for cron scheduled DAGs (#24262)
This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 24da78500a3952850ee98b19d2d62b07417ed5ad
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Thu Jun 9 11:58:40 2022 -0600
Optimize calendar view for cron scheduled DAGs (#24262)
(cherry picked from commit 23fb6635ea31bc4d78b2bcdc07767dd8595a06af)
---
airflow/www/views.py | 38 +++++++++++++++++++++++++-------------
1 file changed, 25 insertions(+), 13 deletions(-)
diff --git a/airflow/www/views.py b/airflow/www/views.py
index e251a54ab7..607b7adff5 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -27,7 +27,7 @@ import traceback
import warnings
from bisect import insort_left
from collections import defaultdict
-from datetime import timedelta
+from datetime import datetime, timedelta
from functools import wraps
from json import JSONDecodeError
from operator import itemgetter
@@ -38,6 +38,7 @@ import lazy_object_proxy
import markupsafe
import nvd3
import sqlalchemy as sqla
+from croniter import croniter
from flask import (
Response,
abort,
@@ -115,6 +116,7 @@ from airflow.security import permissions
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import RUNNING_DEPS, SCHEDULER_QUEUED_DEPS
from airflow.timetables.base import DataInterval, TimeRestriction
+from airflow.timetables.interval import CronDataIntervalTimetable
from airflow.utils import json as utils_json, timezone, yaml
from airflow.utils.airflow_flask_app import get_airflow_app
from airflow.utils.dates import infer_time_unit, scale_time_units
@@ -2741,18 +2743,28 @@ class Airflow(AirflowBaseView):
restriction = TimeRestriction(dag.start_date, dag.end_date, False)
dates = collections.Counter()
- while True:
- info = dag.timetable.next_dagrun_info(
- last_automated_data_interval=last_automated_data_interval, restriction=restriction
- )
- if info is None:
- break
-
- if info.logical_date.year != year:
- break
-
- last_automated_data_interval = info.data_interval
- dates[info.logical_date] += 1
+ if isinstance(dag.timetable, CronDataIntervalTimetable):
+ for next in croniter(
+ dag.timetable.summary, start_time=last_automated_data_interval.end, ret_type=datetime
+ ):
+ if next is None:
+ break
+ if next.year != year:
+ break
+ if dag.end_date and next > dag.end_date:
+ break
+ dates[next.date()] += 1
+ else:
+ while True:
+ info = dag.timetable.next_dagrun_info(
+ last_automated_data_interval=last_automated_data_interval, restriction=restriction
+ )
+ if info is None:
+ break
+ if info.logical_date.year != year:
+ break
+ last_automated_data_interval = info.data_interval
+ dates[info.logical_date] += 1
data_dag_states.extend(
{'date': date.isoformat(), 'state': 'planned', 'count': count}