You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/06/13 07:04:13 UTC

[airflow] branch main updated: Fix flaky order of returned dag runs (#24405)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2edab57d4e Fix flaky order of returned dag runs (#24405)
2edab57d4e is described below

commit 2edab57d4e8ccbd5b8f66c3951615c169fb0543e
Author: Jarek Potiuk <ja...@polidea.com>
AuthorDate: Mon Jun 13 09:04:07 2022 +0200

    Fix flaky order of returned dag runs (#24405)
    
    There was no ordering on a query returning dag_runs when it comes
    to grid view. This caused flaky tests but also it would have
    caused problems with random reordering of reported dagruns in the
    UI (it seems).
    
    This change adds stable ordering on returned Dag Runs:
    
    * by dag_run_id (ascending) asc
    
    No need to filter by map_index as there will be always max one
    returned TI from each dag run
---
 airflow/www/utils.py | 34 +++++++++++++++++++---------------
 airflow/www/views.py |  2 --
 2 files changed, 19 insertions(+), 17 deletions(-)

diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 8c05f37885..63e6921ac4 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -130,21 +130,25 @@ def get_mapped_summary(parent_instance, task_instances):
 
 
 def get_task_summaries(task, dag_runs: List[DagRun], session: Session) -> List[Dict[str, Any]]:
-    tis = session.query(
-        TaskInstance.dag_id,
-        TaskInstance.task_id,
-        TaskInstance.run_id,
-        TaskInstance.map_index,
-        TaskInstance.state,
-        TaskInstance.start_date,
-        TaskInstance.end_date,
-        TaskInstance._try_number,
-    ).filter(
-        TaskInstance.dag_id == task.dag_id,
-        TaskInstance.run_id.in_([dag_run.run_id for dag_run in dag_runs]),
-        TaskInstance.task_id == task.task_id,
-        # Only get normal task instances or the first mapped task
-        TaskInstance.map_index <= 0,
+    tis = (
+        session.query(
+            TaskInstance.dag_id,
+            TaskInstance.task_id,
+            TaskInstance.run_id,
+            TaskInstance.map_index,
+            TaskInstance.state,
+            TaskInstance.start_date,
+            TaskInstance.end_date,
+            TaskInstance._try_number,
+        )
+        .filter(
+            TaskInstance.dag_id == task.dag_id,
+            TaskInstance.run_id.in_([dag_run.run_id for dag_run in dag_runs]),
+            TaskInstance.task_id == task.task_id,
+            # Only get normal task instances or the first mapped task
+            TaskInstance.map_index <= 0,
+        )
+        .order_by(TaskInstance.run_id.asc())
     )
 
     def _get_summary(task_instance):
diff --git a/airflow/www/views.py b/airflow/www/views.py
index fe8bf5c027..d2623182f9 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -3538,7 +3538,6 @@ class Airflow(AirflowBaseView):
                 'groups': task_group_to_grid(dag.task_group, dag, dag_runs, session),
                 'dag_runs': encoded_runs,
             }
-
         # avoid spaces to reduce payload size
         return (
             htmlsafe_json_dumps(data, separators=(',', ':')),
@@ -3579,7 +3578,6 @@ class Airflow(AirflowBaseView):
             query = query.filter(Log.event.notin_(excluded_events))
 
         dag_audit_logs = query.all()
-
         content = self.render_template(
             'airflow/dag_audit_log.html',
             dag=dag,