You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/10/18 13:10:48 UTC
[airflow] 32/41: Fix auto refresh for graph view (#26926)
This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 66724540e7e4a0246625a0970b3dee531ec88e6e
Author: pierrejeambrun <pi...@gmail.com>
AuthorDate: Tue Oct 11 17:04:32 2022 +0200
Fix auto refresh for graph view (#26926)
* Fix auto refresh for graph view
* Add task_instances view test
* Use freezegun to mock datetime
Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
(cherry picked from commit 64622929a043436b235b9fb61fb076c5d2e02124)
---
airflow/www/static/js/graph.js | 5 +-
tests/www/views/test_views_tasks.py | 271 ++++++++++++++++++++++++++++++++----
2 files changed, 249 insertions(+), 27 deletions(-)
diff --git a/airflow/www/static/js/graph.js b/airflow/www/static/js/graph.js
index 6e34cc2829..715c46b3e2 100644
--- a/airflow/www/static/js/graph.js
+++ b/airflow/www/static/js/graph.js
@@ -434,11 +434,10 @@ function handleRefresh() {
// only refresh if the data has changed
if (prevTis !== tis) {
// eslint-disable-next-line no-global-assign
- taskInstances = JSON.parse(tis);
- updateNodesStates(taskInstances);
+ updateNodesStates(tis);
// Only redraw the graph if labels have changed
- const haveLabelsChanged = updateNodeLabels(nodes, taskInstances);
+ const haveLabelsChanged = updateNodeLabels(nodes, tis);
if (haveLabelsChanged) draw();
// end refresh if all states are final
diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py
index 2b6ca3c271..7d1ee8680e 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -24,6 +24,7 @@ import unittest.mock
import urllib.parse
from datetime import timedelta
+import freezegun
import pytest
from airflow import settings
@@ -60,30 +61,31 @@ def reset_dagruns():
@pytest.fixture(autouse=True)
def init_dagruns(app, reset_dagruns):
- app.dag_bag.get_dag("example_bash_operator").create_dagrun(
- run_id=DEFAULT_DAGRUN,
- run_type=DagRunType.SCHEDULED,
- execution_date=DEFAULT_DATE,
- data_interval=(DEFAULT_DATE, DEFAULT_DATE),
- start_date=timezone.utcnow(),
- state=State.RUNNING,
- )
- app.dag_bag.get_dag("example_subdag_operator").create_dagrun(
- run_id=DEFAULT_DAGRUN,
- run_type=DagRunType.SCHEDULED,
- execution_date=DEFAULT_DATE,
- data_interval=(DEFAULT_DATE, DEFAULT_DATE),
- start_date=timezone.utcnow(),
- state=State.RUNNING,
- )
- app.dag_bag.get_dag("example_xcom").create_dagrun(
- run_id=DEFAULT_DAGRUN,
- run_type=DagRunType.SCHEDULED,
- execution_date=DEFAULT_DATE,
- data_interval=(DEFAULT_DATE, DEFAULT_DATE),
- start_date=timezone.utcnow(),
- state=State.RUNNING,
- )
+ with freezegun.freeze_time(DEFAULT_DATE):
+ app.dag_bag.get_dag("example_bash_operator").create_dagrun(
+ run_id=DEFAULT_DAGRUN,
+ run_type=DagRunType.SCHEDULED,
+ execution_date=DEFAULT_DATE,
+ data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+ start_date=timezone.utcnow(),
+ state=State.RUNNING,
+ )
+ app.dag_bag.get_dag("example_subdag_operator").create_dagrun(
+ run_id=DEFAULT_DAGRUN,
+ run_type=DagRunType.SCHEDULED,
+ execution_date=DEFAULT_DATE,
+ data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+ start_date=timezone.utcnow(),
+ state=State.RUNNING,
+ )
+ app.dag_bag.get_dag("example_xcom").create_dagrun(
+ run_id=DEFAULT_DAGRUN,
+ run_type=DagRunType.SCHEDULED,
+ execution_date=DEFAULT_DATE,
+ data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+ start_date=timezone.utcnow(),
+ state=State.RUNNING,
+ )
yield
clear_db_runs()
@@ -993,3 +995,224 @@ def test_graph_view_doesnt_fail_on_recursion_error(app, dag_maker, admin_client)
url = f'/dags/{dag.dag_id}/graph'
resp = admin_client.get(url, follow_redirects=True)
assert resp.status_code == 200
+
+
+def test_task_instances(admin_client):
+ """Test task_instances view."""
+ resp = admin_client.get(
+ f'/object/task_instances?dag_id=example_bash_operator&execution_date={DEFAULT_DATE}',
+ follow_redirects=True,
+ )
+ assert resp.status_code == 200
+ assert resp.json == {
+ 'also_run_this': {
+ 'dag_id': 'example_bash_operator',
+ 'duration': None,
+ 'end_date': None,
+ 'executor_config': {},
+ 'external_executor_id': None,
+ 'hostname': '',
+ 'job_id': None,
+ 'map_index': -1,
+ 'max_tries': 0,
+ 'next_kwargs': None,
+ 'next_method': None,
+ 'operator': 'BashOperator',
+ 'pid': None,
+ 'pool': 'default_pool',
+ 'pool_slots': 1,
+ 'priority_weight': 2,
+ 'queue': 'default',
+ 'queued_by_job_id': None,
+ 'queued_dttm': None,
+ 'run_id': 'TEST_DAGRUN',
+ 'start_date': None,
+ 'state': None,
+ 'task_id': 'also_run_this',
+ 'trigger_id': None,
+ 'trigger_timeout': None,
+ 'try_number': 1,
+ 'unixname': 'root',
+ 'updated_at': DEFAULT_DATE.isoformat(),
+ },
+ 'run_after_loop': {
+ 'dag_id': 'example_bash_operator',
+ 'duration': None,
+ 'end_date': None,
+ 'executor_config': {},
+ 'external_executor_id': None,
+ 'hostname': '',
+ 'job_id': None,
+ 'map_index': -1,
+ 'max_tries': 0,
+ 'next_kwargs': None,
+ 'next_method': None,
+ 'operator': 'BashOperator',
+ 'pid': None,
+ 'pool': 'default_pool',
+ 'pool_slots': 1,
+ 'priority_weight': 2,
+ 'queue': 'default',
+ 'queued_by_job_id': None,
+ 'queued_dttm': None,
+ 'run_id': 'TEST_DAGRUN',
+ 'start_date': None,
+ 'state': None,
+ 'task_id': 'run_after_loop',
+ 'trigger_id': None,
+ 'trigger_timeout': None,
+ 'try_number': 1,
+ 'unixname': 'root',
+ 'updated_at': DEFAULT_DATE.isoformat(),
+ },
+ 'run_this_last': {
+ 'dag_id': 'example_bash_operator',
+ 'duration': None,
+ 'end_date': None,
+ 'executor_config': {},
+ 'external_executor_id': None,
+ 'hostname': '',
+ 'job_id': None,
+ 'map_index': -1,
+ 'max_tries': 0,
+ 'next_kwargs': None,
+ 'next_method': None,
+ 'operator': 'EmptyOperator',
+ 'pid': None,
+ 'pool': 'default_pool',
+ 'pool_slots': 1,
+ 'priority_weight': 1,
+ 'queue': 'default',
+ 'queued_by_job_id': None,
+ 'queued_dttm': None,
+ 'run_id': 'TEST_DAGRUN',
+ 'start_date': None,
+ 'state': None,
+ 'task_id': 'run_this_last',
+ 'trigger_id': None,
+ 'trigger_timeout': None,
+ 'try_number': 1,
+ 'unixname': 'root',
+ 'updated_at': DEFAULT_DATE.isoformat(),
+ },
+ 'runme_0': {
+ 'dag_id': 'example_bash_operator',
+ 'duration': None,
+ 'end_date': None,
+ 'executor_config': {},
+ 'external_executor_id': None,
+ 'hostname': '',
+ 'job_id': None,
+ 'map_index': -1,
+ 'max_tries': 0,
+ 'next_kwargs': None,
+ 'next_method': None,
+ 'operator': 'BashOperator',
+ 'pid': None,
+ 'pool': 'default_pool',
+ 'pool_slots': 1,
+ 'priority_weight': 3,
+ 'queue': 'default',
+ 'queued_by_job_id': None,
+ 'queued_dttm': None,
+ 'run_id': 'TEST_DAGRUN',
+ 'start_date': None,
+ 'state': None,
+ 'task_id': 'runme_0',
+ 'trigger_id': None,
+ 'trigger_timeout': None,
+ 'try_number': 1,
+ 'unixname': 'root',
+ 'updated_at': DEFAULT_DATE.isoformat(),
+ },
+ 'runme_1': {
+ 'dag_id': 'example_bash_operator',
+ 'duration': None,
+ 'end_date': None,
+ 'executor_config': {},
+ 'external_executor_id': None,
+ 'hostname': '',
+ 'job_id': None,
+ 'map_index': -1,
+ 'max_tries': 0,
+ 'next_kwargs': None,
+ 'next_method': None,
+ 'operator': 'BashOperator',
+ 'pid': None,
+ 'pool': 'default_pool',
+ 'pool_slots': 1,
+ 'priority_weight': 3,
+ 'queue': 'default',
+ 'queued_by_job_id': None,
+ 'queued_dttm': None,
+ 'run_id': 'TEST_DAGRUN',
+ 'start_date': None,
+ 'state': None,
+ 'task_id': 'runme_1',
+ 'trigger_id': None,
+ 'trigger_timeout': None,
+ 'try_number': 1,
+ 'unixname': 'root',
+ 'updated_at': DEFAULT_DATE.isoformat(),
+ },
+ 'runme_2': {
+ 'dag_id': 'example_bash_operator',
+ 'duration': None,
+ 'end_date': None,
+ 'executor_config': {},
+ 'external_executor_id': None,
+ 'hostname': '',
+ 'job_id': None,
+ 'map_index': -1,
+ 'max_tries': 0,
+ 'next_kwargs': None,
+ 'next_method': None,
+ 'operator': 'BashOperator',
+ 'pid': None,
+ 'pool': 'default_pool',
+ 'pool_slots': 1,
+ 'priority_weight': 3,
+ 'queue': 'default',
+ 'queued_by_job_id': None,
+ 'queued_dttm': None,
+ 'run_id': 'TEST_DAGRUN',
+ 'start_date': None,
+ 'state': None,
+ 'task_id': 'runme_2',
+ 'trigger_id': None,
+ 'trigger_timeout': None,
+ 'try_number': 1,
+ 'unixname': 'root',
+ 'updated_at': DEFAULT_DATE.isoformat(),
+ },
+ 'this_will_skip': {
+ 'dag_id': 'example_bash_operator',
+ 'duration': None,
+ 'end_date': None,
+ 'executor_config': {},
+ 'external_executor_id': None,
+ 'hostname': '',
+ 'job_id': None,
+ 'map_index': -1,
+ 'max_tries': 0,
+ 'next_kwargs': None,
+ 'next_method': None,
+ 'operator': 'BashOperator',
+ 'pid': None,
+ 'pool': 'default_pool',
+ 'pool_slots': 1,
+ 'priority_weight': 2,
+ 'queue': 'default',
+ 'queued_by_job_id': None,
+ 'queued_dttm': None,
+ 'run_id': 'TEST_DAGRUN',
+ 'start_date': None,
+ 'state': None,
+ 'task_id': 'this_will_skip',
+ 'trigger_id': None,
+ 'trigger_timeout': None,
+ 'try_number': 1,
+ 'unixname': 'root',
+ 'updated_at': DEFAULT_DATE.isoformat(),
+ },
+ }