You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/09/26 15:09:19 UTC

[airflow] 10/13: Use COALESCE when ordering runs to handle NULL (#26626)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit a6bc5d0eb12c70e2552be030fd3638ef4aab7016
Author: Tzu-ping Chung <ur...@gmail.com>
AuthorDate: Mon Sep 26 20:59:08 2022 +0800

    Use COALESCE when ordering runs to handle NULL (#26626)
    
    Data interval columns are NULL for runs created before 2.3, but SQL's
    NULL-sorting logic would make those old runs always appear first. In a
    perfect world we'd want to sort by get_run_data_interval(), but that's
    not efficient, so instead the columns are coalesced into logical date,
    which is good enough in most cases.
    
    (cherry picked from commit 22d52c00f6397fde8d97cf2479c0614671f5b5ba)
---
 airflow/www/utils.py | 44 ++++++++++++++++++++++++++++++++++++++------
 airflow/www/views.py |  5 +----
 2 files changed, 39 insertions(+), 10 deletions(-)

diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 3429d6a140..0aaaf2b26e 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -20,10 +20,9 @@ from __future__ import annotations
 import json
 import textwrap
 import time
-from typing import Any
+from typing import TYPE_CHECKING, Any, Sequence
 from urllib.parse import urlencode
 
-import sqlalchemy as sqla
 from flask import request, url_for
 from flask.helpers import flash
 from flask_appbuilder.forms import FieldConverter
@@ -37,11 +36,12 @@ from markupsafe import Markup
 from pendulum.datetime import DateTime
 from pygments import highlight, lexers
 from pygments.formatters import HtmlFormatter
+from sqlalchemy import func, types
 from sqlalchemy.ext.associationproxy import AssociationProxy
 
-from airflow import models
 from airflow.exceptions import RemovedInAirflow3Warning
 from airflow.models import errors
+from airflow.models.dagrun import DagRun
 from airflow.models.dagwarning import DagWarning
 from airflow.models.taskinstance import TaskInstance
 from airflow.utils import timezone
@@ -51,6 +51,10 @@ from airflow.utils.state import State, TaskInstanceState
 from airflow.www.forms import DateTimeWithTimezoneField
 from airflow.www.widgets import AirflowDateTimePickerWidget
 
+if TYPE_CHECKING:
+    from sqlalchemy.orm.query import Query
+    from sqlalchemy.sql.operators import ColumnOperators
+
 
 def datetime_to_string(value: DateTime | None) -> str | None:
     if value is None:
@@ -129,7 +133,7 @@ def get_mapped_summary(parent_instance, task_instances):
     }
 
 
-def encode_dag_run(dag_run: models.DagRun | None) -> dict[str, Any] | None:
+def encode_dag_run(dag_run: DagRun | None) -> dict[str, Any] | None:
     if not dag_run:
         return None
 
@@ -436,6 +440,34 @@ def dag_run_link(attr):
     return Markup('<a href="{url}">{run_id}</a>').format(url=url, run_id=run_id)
 
 
+def _get_run_ordering_expr(name: str) -> ColumnOperators:
+    expr = DagRun.__table__.columns[name]
+    # Data interval columns are NULL for runs created before 2.3, but SQL's
+    # NULL-sorting logic would make those old runs always appear first. In a
+    # perfect world we'd want to sort by ``get_run_data_interval()``, but that's
+    # not efficient, so instead the columns are coalesced into execution_date,
+    # which is good enough in most cases.
+    if name in ("data_interval_start", "data_interval_end"):
+        expr = func.coalesce(expr, DagRun.execution_date)
+    return expr.desc()
+
+
+def sorted_dag_runs(query: Query, *, ordering: Sequence[str], limit: int) -> Sequence[DagRun]:
+    """Produce DAG runs sorted by specified columns.
+
+    :param query: An ORM query object against *DagRun*.
+    :param ordering: Column names to sort the runs. should generally come from a
+        timetable's ``run_ordering``.
+    :param limit: Number of runs to limit to.
+    :return: A list of DagRun objects ordered by the specified columns. The list
+        contains only the *last* objects, but in *ascending* order.
+    """
+    ordering_exprs = (_get_run_ordering_expr(name) for name in ordering)
+    runs = query.order_by(*ordering_exprs, DagRun.id.desc()).limit(limit).all()
+    runs.reverse()
+    return runs
+
+
 def format_map_index(attr: dict) -> str:
     """Format map index for list columns in model view."""
     value = attr['map_index']
@@ -651,7 +683,7 @@ class CustomSQLAInterface(SQLAInterface):
             obj = self.list_columns[col_name].type
             return (
                 isinstance(obj, UtcDateTime)
-                or isinstance(obj, sqla.types.TypeDecorator)
+                or isinstance(obj, types.TypeDecorator)
                 and isinstance(obj.impl, UtcDateTime)
             )
         return False
@@ -664,7 +696,7 @@ class CustomSQLAInterface(SQLAInterface):
             obj = self.list_columns[col_name].type
             return (
                 isinstance(obj, ExtendedJSON)
-                or isinstance(obj, sqla.types.TypeDecorator)
+                or isinstance(obj, types.TypeDecorator)
                 and isinstance(obj.impl, ExtendedJSON)
             )
         return False
diff --git a/airflow/www/views.py b/airflow/www/views.py
index cc85e23b11..b1d3c1209b 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -3454,10 +3454,7 @@ class Airflow(AirflowBaseView):
             if run_state:
                 query = query.filter(DagRun.state == run_state)
 
-            ordering = (DagRun.__table__.columns[name].desc() for name in dag.timetable.run_ordering)
-            dag_runs = query.order_by(*ordering, DagRun.id.desc()).limit(num_runs).all()
-            dag_runs.reverse()
-
+            dag_runs = wwwutils.sorted_dag_runs(query, ordering=dag.timetable.run_ordering, limit=num_runs)
             encoded_runs = [wwwutils.encode_dag_run(dr) for dr in dag_runs]
             data = {
                 'groups': dag_to_grid(dag, dag_runs, session),