You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ur...@apache.org on 2022/08/18 06:28:15 UTC
[airflow] branch main updated: Allow per-timetable ordering override in grid view (#25633)
This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 36eea1c8e0 Allow per-timetable ordering override in grid view (#25633)
36eea1c8e0 is described below
commit 36eea1c8e05a6791d144e74f4497855e35baeaac
Author: Tzu-ping Chung <ur...@gmail.com>
AuthorDate: Thu Aug 18 14:28:06 2022 +0800
Allow per-timetable ordering override in grid view (#25633)
---
airflow/jobs/scheduler_job.py | 23 ++++++++++-------------
airflow/models/dag.py | 4 ++--
airflow/timetables/base.py | 8 +++++++-
airflow/timetables/simple.py | 1 +
airflow/www/views.py | 4 +++-
newsfragments/25090.significant.rst | 5 +++++
6 files changed, 28 insertions(+), 17 deletions(-)
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 86973fc9e4..8ca31dd1c9 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -25,7 +25,7 @@ import sys
import time
import warnings
from collections import defaultdict
-from datetime import timedelta
+from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Collection, DefaultDict, Dict, Iterator, List, Optional, Set, Tuple
from sqlalchemy import func, not_, or_, text
@@ -1059,25 +1059,21 @@ class SchedulerJob(BaseJob):
# memory for larger dags? or expunge_all()
def _create_dag_runs_dataset_triggered(
- self, dag_models: Collection[DagModel], dataset_triggered_dag_info: Dict, session: Session
+ self,
+ dag_models: Collection[DagModel],
+ dataset_triggered_dag_info: Dict[str, Tuple[datetime, datetime]],
+ session: Session,
) -> None:
"""For DAGs that are triggered by datasets, create dag runs."""
# Bulk Fetch DagRuns with dag_id and execution_date same
# as DagModel.dag_id and DagModel.next_dagrun
# This list is used to verify if the DagRun already exist so that we don't attempt to create
# duplicate dag runs
- exec_dates = {
- dag_id: last_time for dag_id, (first_time, last_time) in dataset_triggered_dag_info.items()
- }
- existing_dagruns = (
- session.query(DagRun.dag_id, DagRun.execution_date)
- .filter(
- tuple_in_condition(
- (DagRun.dag_id, DagRun.execution_date),
- list(exec_dates.items()),
- ),
+ exec_dates = {dag_id: last_time for dag_id, (_, last_time) in dataset_triggered_dag_info.items()}
+ existing_dagruns: Set[Tuple[str, datetime]] = set(
+ session.query(DagRun.dag_id, DagRun.execution_date).filter(
+ tuple_in_condition((DagRun.dag_id, DagRun.execution_date), exec_dates.items())
)
- .all()
)
for dag_model in dag_models:
@@ -1101,6 +1097,7 @@ class SchedulerJob(BaseJob):
dag_run = dag.create_dagrun(
run_type=DagRunType.DATASET_TRIGGERED,
execution_date=exec_date,
+ data_interval=(exec_date, exec_date),
state=DagRunState.QUEUED,
external_trigger=False,
session=session,
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 6fa9067ffc..42b9b6ea15 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -3157,11 +3157,11 @@ class DagModel(Base):
continue
@classmethod
- def dags_needing_dagruns(cls, session: Session) -> Tuple[Query, Dict]:
+ def dags_needing_dagruns(cls, session: Session) -> Tuple[Query, Dict[str, Tuple[datetime, datetime]]]:
"""
Return (and lock) a list of Dag objects that are due to create a new DagRun.
- This will return a resultset of rows that is row-level-locked with a "SELECT ... FOR UPDATE" query,
+ This will return a resultset of rows that is row-level-locked with a "SELECT ... FOR UPDATE" query,
you should ensure that any scheduling decisions are made in a single transaction -- as soon as the
transaction is committed it will be unlocked.
"""
diff --git a/airflow/timetables/base.py b/airflow/timetables/base.py
index 779534116b..2c78d189c1 100644
--- a/airflow/timetables/base.py
+++ b/airflow/timetables/base.py
@@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
-from typing import Any, Dict, NamedTuple, Optional
+from typing import Any, Dict, NamedTuple, Optional, Sequence
from pendulum import DateTime
@@ -125,6 +125,12 @@ class Timetable(Protocol):
this to *False*.
"""
+ run_ordering: Sequence[str] = ("data_interval_end", "execution_date")
+ """How runs triggered from this timetable should be ordered in UI.
+
+ This should be a list of field names on the DAG run object.
+ """
+
@classmethod
def deserialize(cls, data: Dict[str, Any]) -> "Timetable":
"""Deserialize a timetable from data.
diff --git a/airflow/timetables/simple.py b/airflow/timetables/simple.py
index 80e2232108..ba3ed84721 100644
--- a/airflow/timetables/simple.py
+++ b/airflow/timetables/simple.py
@@ -27,6 +27,7 @@ class _TrivialTimetable(Timetable):
periodic = False
can_run = False
+ run_ordering = ("execution_date",)
@classmethod
def deserialize(cls, data: Dict[str, Any]) -> "Timetable":
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 44771df4dd..7febf9a31c 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -3624,8 +3624,10 @@ class Airflow(AirflowBaseView):
if run_state:
query = query.filter(DagRun.state == run_state)
- dag_runs = query.order_by(DagRun.execution_date.desc()).limit(num_runs).all()
+ ordering = (DagRun.__table__.columns[name].desc() for name in dag.timetable.run_ordering)
+ dag_runs = query.order_by(*ordering, DagRun.id.desc()).limit(num_runs).all()
dag_runs.reverse()
+
encoded_runs = [wwwutils.encode_dag_run(dr) for dr in dag_runs]
data = {
'groups': dag_to_grid(dag, dag_runs, session),
diff --git a/newsfragments/25090.significant.rst b/newsfragments/25090.significant.rst
new file mode 100644
index 0000000000..25ea155a36
--- /dev/null
+++ b/newsfragments/25090.significant.rst
@@ -0,0 +1,5 @@
+DAG runs sorting logic changed in grid view
+
+The ordering of DAG runs in the grid view has been changed to be more "natural".
+The new logic generally orders by data interval, but a custom ordering can be
+applied by setting the DAG to use a custom timetable.