You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/09/26 15:09:19 UTC
[airflow] 10/13: Use COALESCE when ordering runs to handle NULL (#26626)
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit a6bc5d0eb12c70e2552be030fd3638ef4aab7016
Author: Tzu-ping Chung <ur...@gmail.com>
AuthorDate: Mon Sep 26 20:59:08 2022 +0800
Use COALESCE when ordering runs to handle NULL (#26626)
Data interval columns are NULL for runs created before 2.3, but SQL's
NULL-sorting logic would make those old runs always appear first. In a
perfect world we'd want to sort by get_run_data_interval(), but that's
not efficient, so instead the columns are coalesced into logical date,
which is good enough in most cases.
(cherry picked from commit 22d52c00f6397fde8d97cf2479c0614671f5b5ba)
---
airflow/www/utils.py | 44 ++++++++++++++++++++++++++++++++++++++------
airflow/www/views.py | 5 +----
2 files changed, 39 insertions(+), 10 deletions(-)
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 3429d6a140..0aaaf2b26e 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -20,10 +20,9 @@ from __future__ import annotations
import json
import textwrap
import time
-from typing import Any
+from typing import TYPE_CHECKING, Any, Sequence
from urllib.parse import urlencode
-import sqlalchemy as sqla
from flask import request, url_for
from flask.helpers import flash
from flask_appbuilder.forms import FieldConverter
@@ -37,11 +36,12 @@ from markupsafe import Markup
from pendulum.datetime import DateTime
from pygments import highlight, lexers
from pygments.formatters import HtmlFormatter
+from sqlalchemy import func, types
from sqlalchemy.ext.associationproxy import AssociationProxy
-from airflow import models
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.models import errors
+from airflow.models.dagrun import DagRun
from airflow.models.dagwarning import DagWarning
from airflow.models.taskinstance import TaskInstance
from airflow.utils import timezone
@@ -51,6 +51,10 @@ from airflow.utils.state import State, TaskInstanceState
from airflow.www.forms import DateTimeWithTimezoneField
from airflow.www.widgets import AirflowDateTimePickerWidget
+if TYPE_CHECKING:
+ from sqlalchemy.orm.query import Query
+ from sqlalchemy.sql.operators import ColumnOperators
+
def datetime_to_string(value: DateTime | None) -> str | None:
if value is None:
@@ -129,7 +133,7 @@ def get_mapped_summary(parent_instance, task_instances):
}
-def encode_dag_run(dag_run: models.DagRun | None) -> dict[str, Any] | None:
+def encode_dag_run(dag_run: DagRun | None) -> dict[str, Any] | None:
if not dag_run:
return None
@@ -436,6 +440,34 @@ def dag_run_link(attr):
return Markup('<a href="{url}">{run_id}</a>').format(url=url, run_id=run_id)
+def _get_run_ordering_expr(name: str) -> ColumnOperators:
+ expr = DagRun.__table__.columns[name]
+ # Data interval columns are NULL for runs created before 2.3, but SQL's
+ # NULL-sorting logic would make those old runs always appear first. In a
+ # perfect world we'd want to sort by ``get_run_data_interval()``, but that's
+ # not efficient, so instead the columns are coalesced into execution_date,
+ # which is good enough in most cases.
+ if name in ("data_interval_start", "data_interval_end"):
+ expr = func.coalesce(expr, DagRun.execution_date)
+ return expr.desc()
+
+
+def sorted_dag_runs(query: Query, *, ordering: Sequence[str], limit: int) -> Sequence[DagRun]:
+ """Produce DAG runs sorted by specified columns.
+
+ :param query: An ORM query object against *DagRun*.
+ :param ordering: Column names to sort the runs. should generally come from a
+ timetable's ``run_ordering``.
+ :param limit: Number of runs to limit to.
+ :return: A list of DagRun objects ordered by the specified columns. The list
+ contains only the *last* objects, but in *ascending* order.
+ """
+ ordering_exprs = (_get_run_ordering_expr(name) for name in ordering)
+ runs = query.order_by(*ordering_exprs, DagRun.id.desc()).limit(limit).all()
+ runs.reverse()
+ return runs
+
+
def format_map_index(attr: dict) -> str:
"""Format map index for list columns in model view."""
value = attr['map_index']
@@ -651,7 +683,7 @@ class CustomSQLAInterface(SQLAInterface):
obj = self.list_columns[col_name].type
return (
isinstance(obj, UtcDateTime)
- or isinstance(obj, sqla.types.TypeDecorator)
+ or isinstance(obj, types.TypeDecorator)
and isinstance(obj.impl, UtcDateTime)
)
return False
@@ -664,7 +696,7 @@ class CustomSQLAInterface(SQLAInterface):
obj = self.list_columns[col_name].type
return (
isinstance(obj, ExtendedJSON)
- or isinstance(obj, sqla.types.TypeDecorator)
+ or isinstance(obj, types.TypeDecorator)
and isinstance(obj.impl, ExtendedJSON)
)
return False
diff --git a/airflow/www/views.py b/airflow/www/views.py
index cc85e23b11..b1d3c1209b 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -3454,10 +3454,7 @@ class Airflow(AirflowBaseView):
if run_state:
query = query.filter(DagRun.state == run_state)
- ordering = (DagRun.__table__.columns[name].desc() for name in dag.timetable.run_ordering)
- dag_runs = query.order_by(*ordering, DagRun.id.desc()).limit(num_runs).all()
- dag_runs.reverse()
-
+ dag_runs = wwwutils.sorted_dag_runs(query, ordering=dag.timetable.run_ordering, limit=num_runs)
encoded_runs = [wwwutils.encode_dag_run(dr) for dr in dag_runs]
data = {
'groups': dag_to_grid(dag, dag_runs, session),