You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by tu...@apache.org on 2020/11/11 22:13:17 UTC

[airflow] branch master updated: Use default view in TriggerDagRunLink (#11778)

This is an automated email from the ASF dual-hosted git repository.

turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 289c9b5  Use default view in TriggerDagRunLink (#11778)
289c9b5 is described below

commit 289c9b5a994a3e26951ca23b6edd30b2329b3089
Author: Tomek Urbaszek <tu...@gmail.com>
AuthorDate: Wed Nov 11 23:11:53 2020 +0100

    Use default view in TriggerDagRunLink (#11778)
---
 airflow/operators/dagrun_operator.py    |  5 +++--
 airflow/sensors/external_task_sensor.py |  5 +++--
 airflow/utils/helpers.py                | 12 ++++++++++++
 tests/utils/test_helpers.py             | 13 ++++++++++++-
 4 files changed, 30 insertions(+), 5 deletions(-)

diff --git a/airflow/operators/dagrun_operator.py b/airflow/operators/dagrun_operator.py
index 0ca0cf5..7547a0f 100644
--- a/airflow/operators/dagrun_operator.py
+++ b/airflow/operators/dagrun_operator.py
@@ -18,13 +18,13 @@
 
 import datetime
 from typing import Dict, Optional, Union
-from urllib.parse import quote
 
 from airflow.api.common.experimental.trigger_dag import trigger_dag
 from airflow.exceptions import DagNotFound, DagRunAlreadyExists
 from airflow.models import BaseOperator, BaseOperatorLink, DagBag, DagModel, DagRun
 from airflow.utils import timezone
 from airflow.utils.decorators import apply_defaults
+from airflow.utils.helpers import build_airflow_url_with_query
 from airflow.utils.types import DagRunType
 
 
@@ -37,7 +37,8 @@ class TriggerDagRunLink(BaseOperatorLink):
     name = 'Triggered DAG'
 
     def get_link(self, operator, dttm):
-        return f"/graph?dag_id={operator.trigger_dag_id}&root=&execution_date={quote(dttm.isoformat())}"
+        query = {"dag_id": operator.trigger_dag_id, "execution_date": dttm.isoformat()}
+        return build_airflow_url_with_query(query)
 
 
 class TriggerDagRunOperator(BaseOperator):
diff --git a/airflow/sensors/external_task_sensor.py b/airflow/sensors/external_task_sensor.py
index 06137a4..c72c0b7e 100644
--- a/airflow/sensors/external_task_sensor.py
+++ b/airflow/sensors/external_task_sensor.py
@@ -19,7 +19,6 @@
 import datetime
 import os
 from typing import FrozenSet, Optional, Union
-from urllib.parse import quote
 
 from sqlalchemy import func
 
@@ -28,6 +27,7 @@ from airflow.models import BaseOperatorLink, DagBag, DagModel, DagRun, TaskInsta
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
+from airflow.utils.helpers import build_airflow_url_with_query
 from airflow.utils.session import provide_session
 from airflow.utils.state import State
 
@@ -41,7 +41,8 @@ class ExternalTaskSensorLink(BaseOperatorLink):
     name = 'External DAG'
 
     def get_link(self, operator, dttm):
-        return f"/graph?dag_id={operator.external_dag_id}&root=&execution_date={quote(dttm.isoformat())}"
+        query = {"dag_id": operator.external_dag_id, "execution_date": dttm.isoformat()}
+        return build_airflow_url_with_query(query)
 
 
 class ExternalTaskSensor(BaseSensorOperator):
diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index 5ccb618..69ac5a0 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -22,9 +22,11 @@ from datetime import datetime
 from functools import reduce
 from itertools import filterfalse, tee
 from typing import Any, Callable, Dict, Generator, Iterable, List, Optional, TypeVar
+from urllib import parse
 
 from jinja2 import Template
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 from airflow.utils.module_loading import import_string
 
@@ -202,3 +204,13 @@ def cross_downstream(*args, **kwargs):
         stacklevel=2,
     )
     return import_string('airflow.models.baseoperator.cross_downstream')(*args, **kwargs)
+
+
+def build_airflow_url_with_query(query: Dict[str, Any]) -> str:
+    """
+    Build airflow url using base_url and default_view and provided query
+    For example:
+    'http://0.0.0.0:8000/base/graph?dag_id=my-task&root=&execution_date=2020-10-27T10%3A59%3A25.615587
+    """
+    view = conf.get('webserver', 'dag_default_view').lower()
+    return f"/{view}?{parse.urlencode(query)}"
diff --git a/tests/utils/test_helpers.py b/tests/utils/test_helpers.py
index 85c53c5..c53aa4f 100644
--- a/tests/utils/test_helpers.py
+++ b/tests/utils/test_helpers.py
@@ -23,7 +23,8 @@ from airflow.models import TaskInstance
 from airflow.models.dag import DAG
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.utils import helpers
-from airflow.utils.helpers import merge_dicts
+from airflow.utils.helpers import build_airflow_url_with_query, merge_dicts
+from tests.test_utils.config import conf_vars
 
 
 class TestHelpers(unittest.TestCase):
@@ -136,3 +137,13 @@ class TestHelpers(unittest.TestCase):
         dict2 = {'a': 1, 'r': {'c': 3, 'b': 0}}
         merged = merge_dicts(dict1, dict2)
         self.assertDictEqual(merged, {'a': 1, 'r': {'b': 0, 'c': 3}})
+
+    @conf_vars(
+        {
+            ("webserver", "dag_default_view"): "custom",
+        }
+    )
+    def test_build_airflow_url_with_query(self):
+        query = {"dag_id": "test_dag", "param": "key/to.encode"}
+        url = build_airflow_url_with_query(query)
+        assert url == "/custom?dag_id=test_dag&param=key%2Fto.encode"