You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ur...@apache.org on 2022/08/18 06:28:15 UTC

[airflow] branch main updated: Allow per-timetable ordering override in grid view (#25633)

This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 36eea1c8e0 Allow per-timetable ordering override in grid view (#25633)
36eea1c8e0 is described below

commit 36eea1c8e05a6791d144e74f4497855e35baeaac
Author: Tzu-ping Chung <ur...@gmail.com>
AuthorDate: Thu Aug 18 14:28:06 2022 +0800

    Allow per-timetable ordering override in grid view (#25633)
---
 airflow/jobs/scheduler_job.py       | 23 ++++++++++-------------
 airflow/models/dag.py               |  4 ++--
 airflow/timetables/base.py          |  8 +++++++-
 airflow/timetables/simple.py        |  1 +
 airflow/www/views.py                |  4 +++-
 newsfragments/25090.significant.rst |  5 +++++
 6 files changed, 28 insertions(+), 17 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 86973fc9e4..8ca31dd1c9 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -25,7 +25,7 @@ import sys
 import time
 import warnings
 from collections import defaultdict
-from datetime import timedelta
+from datetime import datetime, timedelta
 from typing import TYPE_CHECKING, Collection, DefaultDict, Dict, Iterator, List, Optional, Set, Tuple
 
 from sqlalchemy import func, not_, or_, text
@@ -1059,25 +1059,21 @@ class SchedulerJob(BaseJob):
         # memory for larger dags? or expunge_all()
 
     def _create_dag_runs_dataset_triggered(
-        self, dag_models: Collection[DagModel], dataset_triggered_dag_info: Dict, session: Session
+        self,
+        dag_models: Collection[DagModel],
+        dataset_triggered_dag_info: Dict[str, Tuple[datetime, datetime]],
+        session: Session,
     ) -> None:
         """For DAGs that are triggered by datasets, create dag runs."""
         # Bulk Fetch DagRuns with dag_id and execution_date same
         # as DagModel.dag_id and DagModel.next_dagrun
         # This list is used to verify if the DagRun already exist so that we don't attempt to create
         # duplicate dag runs
-        exec_dates = {
-            dag_id: last_time for dag_id, (first_time, last_time) in dataset_triggered_dag_info.items()
-        }
-        existing_dagruns = (
-            session.query(DagRun.dag_id, DagRun.execution_date)
-            .filter(
-                tuple_in_condition(
-                    (DagRun.dag_id, DagRun.execution_date),
-                    list(exec_dates.items()),
-                ),
+        exec_dates = {dag_id: last_time for dag_id, (_, last_time) in dataset_triggered_dag_info.items()}
+        existing_dagruns: Set[Tuple[str, datetime]] = set(
+            session.query(DagRun.dag_id, DagRun.execution_date).filter(
+                tuple_in_condition((DagRun.dag_id, DagRun.execution_date), exec_dates.items())
             )
-            .all()
         )
 
         for dag_model in dag_models:
@@ -1101,6 +1097,7 @@ class SchedulerJob(BaseJob):
                 dag_run = dag.create_dagrun(
                     run_type=DagRunType.DATASET_TRIGGERED,
                     execution_date=exec_date,
+                    data_interval=(exec_date, exec_date),
                     state=DagRunState.QUEUED,
                     external_trigger=False,
                     session=session,
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 6fa9067ffc..42b9b6ea15 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -3157,11 +3157,11 @@ class DagModel(Base):
                 continue
 
     @classmethod
-    def dags_needing_dagruns(cls, session: Session) -> Tuple[Query, Dict]:
+    def dags_needing_dagruns(cls, session: Session) -> Tuple[Query, Dict[str, Tuple[datetime, datetime]]]:
         """
         Return (and lock) a list of Dag objects that are due to create a new DagRun.
 
-        This will return a resultset of rows  that is row-level-locked with a "SELECT ... FOR UPDATE" query,
+        This will return a resultset of rows that is row-level-locked with a "SELECT ... FOR UPDATE" query,
         you should ensure that any scheduling decisions are made in a single transaction -- as soon as the
         transaction is committed it will be unlocked.
         """
diff --git a/airflow/timetables/base.py b/airflow/timetables/base.py
index 779534116b..2c78d189c1 100644
--- a/airflow/timetables/base.py
+++ b/airflow/timetables/base.py
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from typing import Any, Dict, NamedTuple, Optional
+from typing import Any, Dict, NamedTuple, Optional, Sequence
 
 from pendulum import DateTime
 
@@ -125,6 +125,12 @@ class Timetable(Protocol):
     this to *False*.
     """
 
+    run_ordering: Sequence[str] = ("data_interval_end", "execution_date")
+    """How runs triggered from this timetable should be ordered in UI.
+
+    This should be a list of field names on the DAG run object.
+    """
+
     @classmethod
     def deserialize(cls, data: Dict[str, Any]) -> "Timetable":
         """Deserialize a timetable from data.
diff --git a/airflow/timetables/simple.py b/airflow/timetables/simple.py
index 80e2232108..ba3ed84721 100644
--- a/airflow/timetables/simple.py
+++ b/airflow/timetables/simple.py
@@ -27,6 +27,7 @@ class _TrivialTimetable(Timetable):
 
     periodic = False
     can_run = False
+    run_ordering = ("execution_date",)
 
     @classmethod
     def deserialize(cls, data: Dict[str, Any]) -> "Timetable":
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 44771df4dd..7febf9a31c 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -3624,8 +3624,10 @@ class Airflow(AirflowBaseView):
             if run_state:
                 query = query.filter(DagRun.state == run_state)
 
-            dag_runs = query.order_by(DagRun.execution_date.desc()).limit(num_runs).all()
+            ordering = (DagRun.__table__.columns[name].desc() for name in dag.timetable.run_ordering)
+            dag_runs = query.order_by(*ordering, DagRun.id.desc()).limit(num_runs).all()
             dag_runs.reverse()
+
             encoded_runs = [wwwutils.encode_dag_run(dr) for dr in dag_runs]
             data = {
                 'groups': dag_to_grid(dag, dag_runs, session),
diff --git a/newsfragments/25090.significant.rst b/newsfragments/25090.significant.rst
new file mode 100644
index 0000000000..25ea155a36
--- /dev/null
+++ b/newsfragments/25090.significant.rst
@@ -0,0 +1,5 @@
+DAG runs sorting logic changed in grid view
+
+The ordering of DAG runs in the grid view has been changed to be more "natural".
+The new logic generally orders by data interval, but a custom ordering can be
+applied by setting the DAG to use a custom timetable.