You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/06/29 15:20:11 UTC

[airflow] 26/45: Fix flaky order of returned dag runs (#24405)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit fa50004b7dff12bf8431ac1833ea949b43351ad0
Author: Jarek Potiuk <ja...@polidea.com>
AuthorDate: Mon Jun 13 09:04:07 2022 +0200

    Fix flaky order of returned dag runs (#24405)
    
    There was no ordering on a query returning dag_runs when it comes
    to grid view. This caused flaky tests but also it would have
    caused problems with random reordering of reported dagruns in the
    UI (it seems).
    
    This change adds stable ordering on returned Dag Runs:
    
    * by dag_run_id (ascending) asc
    
    No need to filter by map_index as there will be always max one
    returned TI from each dag run
    
    (cherry picked from commit 2edab57d4e8ccbd5b8f66c3951615c169fb0543e)
---
 airflow/www/utils.py | 34 +++++++++++++++++++---------------
 airflow/www/views.py |  2 --
 2 files changed, 19 insertions(+), 17 deletions(-)

diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 3c63584e88..feeedd0cef 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -130,21 +130,25 @@ def get_mapped_summary(parent_instance, task_instances):
 
 
 def get_task_summaries(task, dag_runs: List[DagRun], session: Session) -> List[Dict[str, Any]]:
-    tis = session.query(
-        TaskInstance.dag_id,
-        TaskInstance.task_id,
-        TaskInstance.run_id,
-        TaskInstance.map_index,
-        TaskInstance.state,
-        TaskInstance.start_date,
-        TaskInstance.end_date,
-        TaskInstance._try_number,
-    ).filter(
-        TaskInstance.dag_id == task.dag_id,
-        TaskInstance.run_id.in_([dag_run.run_id for dag_run in dag_runs]),
-        TaskInstance.task_id == task.task_id,
-        # Only get normal task instances or the first mapped task
-        TaskInstance.map_index <= 0,
+    tis = (
+        session.query(
+            TaskInstance.dag_id,
+            TaskInstance.task_id,
+            TaskInstance.run_id,
+            TaskInstance.map_index,
+            TaskInstance.state,
+            TaskInstance.start_date,
+            TaskInstance.end_date,
+            TaskInstance._try_number,
+        )
+        .filter(
+            TaskInstance.dag_id == task.dag_id,
+            TaskInstance.run_id.in_([dag_run.run_id for dag_run in dag_runs]),
+            TaskInstance.task_id == task.task_id,
+            # Only get normal task instances or the first mapped task
+            TaskInstance.map_index <= 0,
+        )
+        .order_by(TaskInstance.run_id.asc())
     )
 
     def _get_summary(task_instance):
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 0cea403fb9..fe4217c452 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -3543,7 +3543,6 @@ class Airflow(AirflowBaseView):
                 'groups': task_group_to_grid(dag.task_group, dag, dag_runs, session),
                 'dag_runs': encoded_runs,
             }
-
         # avoid spaces to reduce payload size
         return htmlsafe_json_dumps(data, separators=(',', ':'))
 
@@ -3581,7 +3580,6 @@ class Airflow(AirflowBaseView):
             query = query.filter(Log.event.notin_(excluded_events))
 
         dag_audit_logs = query.all()
-
         content = self.render_template(
             'airflow/dag_audit_log.html',
             dag=dag,