You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/10/18 13:10:48 UTC

[airflow] 32/41: Fix auto refresh for graph view (#26926)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 66724540e7e4a0246625a0970b3dee531ec88e6e
Author: pierrejeambrun <pi...@gmail.com>
AuthorDate: Tue Oct 11 17:04:32 2022 +0200

    Fix auto refresh for graph view (#26926)
    
    * Fix auto refresh for graph view
    
    * Add task_instances view test
    
    * Use freezegun to mock datetime
    
    Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
    (cherry picked from commit 64622929a043436b235b9fb61fb076c5d2e02124)
---
 airflow/www/static/js/graph.js      |   5 +-
 tests/www/views/test_views_tasks.py | 271 ++++++++++++++++++++++++++++++++----
 2 files changed, 249 insertions(+), 27 deletions(-)

diff --git a/airflow/www/static/js/graph.js b/airflow/www/static/js/graph.js
index 6e34cc2829..715c46b3e2 100644
--- a/airflow/www/static/js/graph.js
+++ b/airflow/www/static/js/graph.js
@@ -434,11 +434,10 @@ function handleRefresh() {
         // only refresh if the data has changed
         if (prevTis !== tis) {
         // eslint-disable-next-line no-global-assign
-          taskInstances = JSON.parse(tis);
-          updateNodesStates(taskInstances);
+          updateNodesStates(tis);
 
           // Only redraw the graph if labels have changed
-          const haveLabelsChanged = updateNodeLabels(nodes, taskInstances);
+          const haveLabelsChanged = updateNodeLabels(nodes, tis);
           if (haveLabelsChanged) draw();
 
           // end refresh if all states are final
diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py
index 2b6ca3c271..7d1ee8680e 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -24,6 +24,7 @@ import unittest.mock
 import urllib.parse
 from datetime import timedelta
 
+import freezegun
 import pytest
 
 from airflow import settings
@@ -60,30 +61,31 @@ def reset_dagruns():
 
 @pytest.fixture(autouse=True)
 def init_dagruns(app, reset_dagruns):
-    app.dag_bag.get_dag("example_bash_operator").create_dagrun(
-        run_id=DEFAULT_DAGRUN,
-        run_type=DagRunType.SCHEDULED,
-        execution_date=DEFAULT_DATE,
-        data_interval=(DEFAULT_DATE, DEFAULT_DATE),
-        start_date=timezone.utcnow(),
-        state=State.RUNNING,
-    )
-    app.dag_bag.get_dag("example_subdag_operator").create_dagrun(
-        run_id=DEFAULT_DAGRUN,
-        run_type=DagRunType.SCHEDULED,
-        execution_date=DEFAULT_DATE,
-        data_interval=(DEFAULT_DATE, DEFAULT_DATE),
-        start_date=timezone.utcnow(),
-        state=State.RUNNING,
-    )
-    app.dag_bag.get_dag("example_xcom").create_dagrun(
-        run_id=DEFAULT_DAGRUN,
-        run_type=DagRunType.SCHEDULED,
-        execution_date=DEFAULT_DATE,
-        data_interval=(DEFAULT_DATE, DEFAULT_DATE),
-        start_date=timezone.utcnow(),
-        state=State.RUNNING,
-    )
+    with freezegun.freeze_time(DEFAULT_DATE):
+        app.dag_bag.get_dag("example_bash_operator").create_dagrun(
+            run_id=DEFAULT_DAGRUN,
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE,
+            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+            start_date=timezone.utcnow(),
+            state=State.RUNNING,
+        )
+        app.dag_bag.get_dag("example_subdag_operator").create_dagrun(
+            run_id=DEFAULT_DAGRUN,
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE,
+            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+            start_date=timezone.utcnow(),
+            state=State.RUNNING,
+        )
+        app.dag_bag.get_dag("example_xcom").create_dagrun(
+            run_id=DEFAULT_DAGRUN,
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE,
+            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
+            start_date=timezone.utcnow(),
+            state=State.RUNNING,
+        )
     yield
     clear_db_runs()
 
@@ -993,3 +995,224 @@ def test_graph_view_doesnt_fail_on_recursion_error(app, dag_maker, admin_client)
         url = f'/dags/{dag.dag_id}/graph'
         resp = admin_client.get(url, follow_redirects=True)
         assert resp.status_code == 200
+
+
+def test_task_instances(admin_client):
+    """Test task_instances view."""
+    resp = admin_client.get(
+        f'/object/task_instances?dag_id=example_bash_operator&execution_date={DEFAULT_DATE}',
+        follow_redirects=True,
+    )
+    assert resp.status_code == 200
+    assert resp.json == {
+        'also_run_this': {
+            'dag_id': 'example_bash_operator',
+            'duration': None,
+            'end_date': None,
+            'executor_config': {},
+            'external_executor_id': None,
+            'hostname': '',
+            'job_id': None,
+            'map_index': -1,
+            'max_tries': 0,
+            'next_kwargs': None,
+            'next_method': None,
+            'operator': 'BashOperator',
+            'pid': None,
+            'pool': 'default_pool',
+            'pool_slots': 1,
+            'priority_weight': 2,
+            'queue': 'default',
+            'queued_by_job_id': None,
+            'queued_dttm': None,
+            'run_id': 'TEST_DAGRUN',
+            'start_date': None,
+            'state': None,
+            'task_id': 'also_run_this',
+            'trigger_id': None,
+            'trigger_timeout': None,
+            'try_number': 1,
+            'unixname': 'root',
+            'updated_at': DEFAULT_DATE.isoformat(),
+        },
+        'run_after_loop': {
+            'dag_id': 'example_bash_operator',
+            'duration': None,
+            'end_date': None,
+            'executor_config': {},
+            'external_executor_id': None,
+            'hostname': '',
+            'job_id': None,
+            'map_index': -1,
+            'max_tries': 0,
+            'next_kwargs': None,
+            'next_method': None,
+            'operator': 'BashOperator',
+            'pid': None,
+            'pool': 'default_pool',
+            'pool_slots': 1,
+            'priority_weight': 2,
+            'queue': 'default',
+            'queued_by_job_id': None,
+            'queued_dttm': None,
+            'run_id': 'TEST_DAGRUN',
+            'start_date': None,
+            'state': None,
+            'task_id': 'run_after_loop',
+            'trigger_id': None,
+            'trigger_timeout': None,
+            'try_number': 1,
+            'unixname': 'root',
+            'updated_at': DEFAULT_DATE.isoformat(),
+        },
+        'run_this_last': {
+            'dag_id': 'example_bash_operator',
+            'duration': None,
+            'end_date': None,
+            'executor_config': {},
+            'external_executor_id': None,
+            'hostname': '',
+            'job_id': None,
+            'map_index': -1,
+            'max_tries': 0,
+            'next_kwargs': None,
+            'next_method': None,
+            'operator': 'EmptyOperator',
+            'pid': None,
+            'pool': 'default_pool',
+            'pool_slots': 1,
+            'priority_weight': 1,
+            'queue': 'default',
+            'queued_by_job_id': None,
+            'queued_dttm': None,
+            'run_id': 'TEST_DAGRUN',
+            'start_date': None,
+            'state': None,
+            'task_id': 'run_this_last',
+            'trigger_id': None,
+            'trigger_timeout': None,
+            'try_number': 1,
+            'unixname': 'root',
+            'updated_at': DEFAULT_DATE.isoformat(),
+        },
+        'runme_0': {
+            'dag_id': 'example_bash_operator',
+            'duration': None,
+            'end_date': None,
+            'executor_config': {},
+            'external_executor_id': None,
+            'hostname': '',
+            'job_id': None,
+            'map_index': -1,
+            'max_tries': 0,
+            'next_kwargs': None,
+            'next_method': None,
+            'operator': 'BashOperator',
+            'pid': None,
+            'pool': 'default_pool',
+            'pool_slots': 1,
+            'priority_weight': 3,
+            'queue': 'default',
+            'queued_by_job_id': None,
+            'queued_dttm': None,
+            'run_id': 'TEST_DAGRUN',
+            'start_date': None,
+            'state': None,
+            'task_id': 'runme_0',
+            'trigger_id': None,
+            'trigger_timeout': None,
+            'try_number': 1,
+            'unixname': 'root',
+            'updated_at': DEFAULT_DATE.isoformat(),
+        },
+        'runme_1': {
+            'dag_id': 'example_bash_operator',
+            'duration': None,
+            'end_date': None,
+            'executor_config': {},
+            'external_executor_id': None,
+            'hostname': '',
+            'job_id': None,
+            'map_index': -1,
+            'max_tries': 0,
+            'next_kwargs': None,
+            'next_method': None,
+            'operator': 'BashOperator',
+            'pid': None,
+            'pool': 'default_pool',
+            'pool_slots': 1,
+            'priority_weight': 3,
+            'queue': 'default',
+            'queued_by_job_id': None,
+            'queued_dttm': None,
+            'run_id': 'TEST_DAGRUN',
+            'start_date': None,
+            'state': None,
+            'task_id': 'runme_1',
+            'trigger_id': None,
+            'trigger_timeout': None,
+            'try_number': 1,
+            'unixname': 'root',
+            'updated_at': DEFAULT_DATE.isoformat(),
+        },
+        'runme_2': {
+            'dag_id': 'example_bash_operator',
+            'duration': None,
+            'end_date': None,
+            'executor_config': {},
+            'external_executor_id': None,
+            'hostname': '',
+            'job_id': None,
+            'map_index': -1,
+            'max_tries': 0,
+            'next_kwargs': None,
+            'next_method': None,
+            'operator': 'BashOperator',
+            'pid': None,
+            'pool': 'default_pool',
+            'pool_slots': 1,
+            'priority_weight': 3,
+            'queue': 'default',
+            'queued_by_job_id': None,
+            'queued_dttm': None,
+            'run_id': 'TEST_DAGRUN',
+            'start_date': None,
+            'state': None,
+            'task_id': 'runme_2',
+            'trigger_id': None,
+            'trigger_timeout': None,
+            'try_number': 1,
+            'unixname': 'root',
+            'updated_at': DEFAULT_DATE.isoformat(),
+        },
+        'this_will_skip': {
+            'dag_id': 'example_bash_operator',
+            'duration': None,
+            'end_date': None,
+            'executor_config': {},
+            'external_executor_id': None,
+            'hostname': '',
+            'job_id': None,
+            'map_index': -1,
+            'max_tries': 0,
+            'next_kwargs': None,
+            'next_method': None,
+            'operator': 'BashOperator',
+            'pid': None,
+            'pool': 'default_pool',
+            'pool_slots': 1,
+            'priority_weight': 2,
+            'queue': 'default',
+            'queued_by_job_id': None,
+            'queued_dttm': None,
+            'run_id': 'TEST_DAGRUN',
+            'start_date': None,
+            'state': None,
+            'task_id': 'this_will_skip',
+            'trigger_id': None,
+            'trigger_timeout': None,
+            'try_number': 1,
+            'unixname': 'root',
+            'updated_at': DEFAULT_DATE.isoformat(),
+        },
+    }