You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/02/15 20:52:00 UTC
[airflow] 02/05: Show task status only for running dags or only for the last finished dag (#21352)
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 57abbac689472fd04db036e0dc06794971a0a68e
Author: Aleksey Kirilishin <54...@users.noreply.github.com>
AuthorDate: Mon Feb 14 18:55:00 2022 +0300
Show task status only for running dags or only for the last finished dag (#21352)
* Show task status only for running dags or only for the last finished dag
* Brought the logic of getting task statistics into a separate function
(cherry picked from commit 28d7bde2750c38300e5cf70ba32be153b1a11f2c)
---
airflow/www/views.py | 64 ++++++++++++++++++++++++++++++++++---------
tests/www/views/test_views.py | 35 ++++++++++++++++++++++-
2 files changed, 85 insertions(+), 14 deletions(-)
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 2ed2a67..9ebe899 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -408,6 +408,31 @@ def dag_edges(dag):
return result
+def get_task_stats_from_query(qry):
+ """
+ Return a dict of the task quantity, grouped by dag id and task status.
+
+ :param qry: The data in the format (<dag id>, <task state>, <is dag running>, <task count>),
+ ordered by <dag id> and <is dag running>
+ """
+ data = {}
+ last_dag_id = None
+ has_running_dags = False
+ for dag_id, state, is_dag_running, count in qry:
+ if last_dag_id != dag_id:
+ last_dag_id = dag_id
+ has_running_dags = False
+ elif not is_dag_running and has_running_dags:
+ continue
+
+ if is_dag_running:
+ has_running_dags = True
+ if dag_id not in data:
+ data[dag_id] = {}
+ data[dag_id][state] = count
+ return data
+
+
######################################################################################
# Error handlers
######################################################################################
@@ -814,7 +839,9 @@ class Airflow(AirflowBaseView):
# Select all task_instances from active dag_runs.
running_task_instance_query_result = session.query(
- TaskInstance.dag_id.label('dag_id'), TaskInstance.state.label('state')
+ TaskInstance.dag_id.label('dag_id'),
+ TaskInstance.state.label('state'),
+ sqla.literal(True).label('is_dag_running'),
).join(
running_dag_run_query_result,
and_(
@@ -838,7 +865,11 @@ class Airflow(AirflowBaseView):
# Select all task_instances from active dag_runs.
# If no dag_run is active, return task instances from most recent dag_run.
last_task_instance_query_result = (
- session.query(TaskInstance.dag_id.label('dag_id'), TaskInstance.state.label('state'))
+ session.query(
+ TaskInstance.dag_id.label('dag_id'),
+ TaskInstance.state.label('state'),
+ sqla.literal(False).label('is_dag_running'),
+ )
.join(TaskInstance.dag_run)
.join(
last_dag_run,
@@ -855,18 +886,25 @@ class Airflow(AirflowBaseView):
else:
final_task_instance_query_result = running_task_instance_query_result.subquery('final_ti')
- qry = session.query(
- final_task_instance_query_result.c.dag_id,
- final_task_instance_query_result.c.state,
- sqla.func.count(),
- ).group_by(final_task_instance_query_result.c.dag_id, final_task_instance_query_result.c.state)
-
- data = {}
- for dag_id, state, count in qry:
- if dag_id not in data:
- data[dag_id] = {}
- data[dag_id][state] = count
+ qry = (
+ session.query(
+ final_task_instance_query_result.c.dag_id,
+ final_task_instance_query_result.c.state,
+ final_task_instance_query_result.c.is_dag_running,
+ sqla.func.count(),
+ )
+ .group_by(
+ final_task_instance_query_result.c.dag_id,
+ final_task_instance_query_result.c.state,
+ final_task_instance_query_result.c.is_dag_running,
+ )
+ .order_by(
+ final_task_instance_query_result.c.dag_id,
+ final_task_instance_query_result.c.is_dag_running.desc(),
+ )
+ )
+ data = get_task_stats_from_query(qry)
payload = {}
for dag_id in filter_dag_ids:
payload[dag_id] = []
diff --git a/tests/www/views/test_views.py b/tests/www/views/test_views.py
index b98c1bc..672d4a1 100644
--- a/tests/www/views/test_views.py
+++ b/tests/www/views/test_views.py
@@ -24,7 +24,13 @@ import pytest
from airflow.configuration import initialize_config
from airflow.plugins_manager import AirflowPlugin, EntryPointSource
from airflow.www import views
-from airflow.www.views import get_key_paths, get_safe_url, get_value_from_path, truncate_task_duration
+from airflow.www.views import (
+ get_key_paths,
+ get_safe_url,
+ get_task_stats_from_query,
+ get_value_from_path,
+ truncate_task_duration,
+)
from tests.test_utils.config import conf_vars
from tests.test_utils.mock_plugins import mock_plugin_manager
from tests.test_utils.www import check_content_in_response, check_content_not_in_response
@@ -333,3 +339,30 @@ def test_dag_edit_privileged_requires_view_has_action_decorators(cls: type):
action_funcs = action_funcs - {"action_post"}
for action_function in action_funcs:
assert_decorator_used(cls, action_function, views.action_has_dag_edit_access)
+
+
+def test_get_task_stats_from_query():
+ query_data = [
+ ['dag1', 'queued', True, 1],
+ ['dag1', 'running', True, 2],
+ ['dag1', 'success', False, 3],
+ ['dag2', 'running', True, 4],
+ ['dag2', 'success', True, 5],
+ ['dag3', 'success', False, 6],
+ ]
+ expected_data = {
+ 'dag1': {
+ 'queued': 1,
+ 'running': 2,
+ },
+ 'dag2': {
+ 'running': 4,
+ 'success': 5,
+ },
+ 'dag3': {
+ 'success': 6,
+ },
+ }
+
+ data = get_task_stats_from_query(query_data)
+ assert data == expected_data