You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/02/15 20:52:00 UTC

[airflow] 02/05: Show task status only for running dags or only for the last finished dag (#21352)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 57abbac689472fd04db036e0dc06794971a0a68e
Author: Aleksey Kirilishin <54...@users.noreply.github.com>
AuthorDate: Mon Feb 14 18:55:00 2022 +0300

    Show task status only for running dags or only for the last finished dag (#21352)
    
    * Show task status only for running dags or only for the last finished dag
    
    * Brought the logic of getting task statistics into a separate function
    
    (cherry picked from commit 28d7bde2750c38300e5cf70ba32be153b1a11f2c)
---
 airflow/www/views.py          | 64 ++++++++++++++++++++++++++++++++++---------
 tests/www/views/test_views.py | 35 ++++++++++++++++++++++-
 2 files changed, 85 insertions(+), 14 deletions(-)

diff --git a/airflow/www/views.py b/airflow/www/views.py
index 2ed2a67..9ebe899 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -408,6 +408,31 @@ def dag_edges(dag):
     return result
 
 
+def get_task_stats_from_query(qry):
+    """
+    Return a dict of the task quantity, grouped by dag id and task status.
+
+    :param qry: The data in the format (<dag id>, <task state>, <is dag running>, <task count>),
+        ordered by <dag id> and <is dag running>
+    """
+    data = {}
+    last_dag_id = None
+    has_running_dags = False
+    for dag_id, state, is_dag_running, count in qry:
+        if last_dag_id != dag_id:
+            last_dag_id = dag_id
+            has_running_dags = False
+        elif not is_dag_running and has_running_dags:
+            continue
+
+        if is_dag_running:
+            has_running_dags = True
+        if dag_id not in data:
+            data[dag_id] = {}
+        data[dag_id][state] = count
+    return data
+
+
 ######################################################################################
 #                                    Error handlers
 ######################################################################################
@@ -814,7 +839,9 @@ class Airflow(AirflowBaseView):
 
         # Select all task_instances from active dag_runs.
         running_task_instance_query_result = session.query(
-            TaskInstance.dag_id.label('dag_id'), TaskInstance.state.label('state')
+            TaskInstance.dag_id.label('dag_id'),
+            TaskInstance.state.label('state'),
+            sqla.literal(True).label('is_dag_running'),
         ).join(
             running_dag_run_query_result,
             and_(
@@ -838,7 +865,11 @@ class Airflow(AirflowBaseView):
             # Select all task_instances from active dag_runs.
             # If no dag_run is active, return task instances from most recent dag_run.
             last_task_instance_query_result = (
-                session.query(TaskInstance.dag_id.label('dag_id'), TaskInstance.state.label('state'))
+                session.query(
+                    TaskInstance.dag_id.label('dag_id'),
+                    TaskInstance.state.label('state'),
+                    sqla.literal(False).label('is_dag_running'),
+                )
                 .join(TaskInstance.dag_run)
                 .join(
                     last_dag_run,
@@ -855,18 +886,25 @@ class Airflow(AirflowBaseView):
         else:
             final_task_instance_query_result = running_task_instance_query_result.subquery('final_ti')
 
-        qry = session.query(
-            final_task_instance_query_result.c.dag_id,
-            final_task_instance_query_result.c.state,
-            sqla.func.count(),
-        ).group_by(final_task_instance_query_result.c.dag_id, final_task_instance_query_result.c.state)
-
-        data = {}
-        for dag_id, state, count in qry:
-            if dag_id not in data:
-                data[dag_id] = {}
-            data[dag_id][state] = count
+        qry = (
+            session.query(
+                final_task_instance_query_result.c.dag_id,
+                final_task_instance_query_result.c.state,
+                final_task_instance_query_result.c.is_dag_running,
+                sqla.func.count(),
+            )
+            .group_by(
+                final_task_instance_query_result.c.dag_id,
+                final_task_instance_query_result.c.state,
+                final_task_instance_query_result.c.is_dag_running,
+            )
+            .order_by(
+                final_task_instance_query_result.c.dag_id,
+                final_task_instance_query_result.c.is_dag_running.desc(),
+            )
+        )
 
+        data = get_task_stats_from_query(qry)
         payload = {}
         for dag_id in filter_dag_ids:
             payload[dag_id] = []
diff --git a/tests/www/views/test_views.py b/tests/www/views/test_views.py
index b98c1bc..672d4a1 100644
--- a/tests/www/views/test_views.py
+++ b/tests/www/views/test_views.py
@@ -24,7 +24,13 @@ import pytest
 from airflow.configuration import initialize_config
 from airflow.plugins_manager import AirflowPlugin, EntryPointSource
 from airflow.www import views
-from airflow.www.views import get_key_paths, get_safe_url, get_value_from_path, truncate_task_duration
+from airflow.www.views import (
+    get_key_paths,
+    get_safe_url,
+    get_task_stats_from_query,
+    get_value_from_path,
+    truncate_task_duration,
+)
 from tests.test_utils.config import conf_vars
 from tests.test_utils.mock_plugins import mock_plugin_manager
 from tests.test_utils.www import check_content_in_response, check_content_not_in_response
@@ -333,3 +339,30 @@ def test_dag_edit_privileged_requires_view_has_action_decorators(cls: type):
     action_funcs = action_funcs - {"action_post"}
     for action_function in action_funcs:
         assert_decorator_used(cls, action_function, views.action_has_dag_edit_access)
+
+
+def test_get_task_stats_from_query():
+    query_data = [
+        ['dag1', 'queued', True, 1],
+        ['dag1', 'running', True, 2],
+        ['dag1', 'success', False, 3],
+        ['dag2', 'running', True, 4],
+        ['dag2', 'success', True, 5],
+        ['dag3', 'success', False, 6],
+    ]
+    expected_data = {
+        'dag1': {
+            'queued': 1,
+            'running': 2,
+        },
+        'dag2': {
+            'running': 4,
+            'success': 5,
+        },
+        'dag3': {
+            'success': 6,
+        },
+    }
+
+    data = get_task_stats_from_query(query_data)
+    assert data == expected_data