You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by tu...@apache.org on 2020/11/11 22:13:17 UTC
[airflow] branch master updated: Use default view in
TriggerDagRunLink (#11778)
This is an automated email from the ASF dual-hosted git repository.
turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 289c9b5 Use default view in TriggerDagRunLink (#11778)
289c9b5 is described below
commit 289c9b5a994a3e26951ca23b6edd30b2329b3089
Author: Tomek Urbaszek <tu...@gmail.com>
AuthorDate: Wed Nov 11 23:11:53 2020 +0100
Use default view in TriggerDagRunLink (#11778)
---
airflow/operators/dagrun_operator.py | 5 +++--
airflow/sensors/external_task_sensor.py | 5 +++--
airflow/utils/helpers.py | 12 ++++++++++++
tests/utils/test_helpers.py | 13 ++++++++++++-
4 files changed, 30 insertions(+), 5 deletions(-)
diff --git a/airflow/operators/dagrun_operator.py b/airflow/operators/dagrun_operator.py
index 0ca0cf5..7547a0f 100644
--- a/airflow/operators/dagrun_operator.py
+++ b/airflow/operators/dagrun_operator.py
@@ -18,13 +18,13 @@
import datetime
from typing import Dict, Optional, Union
-from urllib.parse import quote
from airflow.api.common.experimental.trigger_dag import trigger_dag
from airflow.exceptions import DagNotFound, DagRunAlreadyExists
from airflow.models import BaseOperator, BaseOperatorLink, DagBag, DagModel, DagRun
from airflow.utils import timezone
from airflow.utils.decorators import apply_defaults
+from airflow.utils.helpers import build_airflow_url_with_query
from airflow.utils.types import DagRunType
@@ -37,7 +37,8 @@ class TriggerDagRunLink(BaseOperatorLink):
name = 'Triggered DAG'
def get_link(self, operator, dttm):
- return f"/graph?dag_id={operator.trigger_dag_id}&root=&execution_date={quote(dttm.isoformat())}"
+ query = {"dag_id": operator.trigger_dag_id, "execution_date": dttm.isoformat()}
+ return build_airflow_url_with_query(query)
class TriggerDagRunOperator(BaseOperator):
diff --git a/airflow/sensors/external_task_sensor.py b/airflow/sensors/external_task_sensor.py
index 06137a4..c72c0b7e 100644
--- a/airflow/sensors/external_task_sensor.py
+++ b/airflow/sensors/external_task_sensor.py
@@ -19,7 +19,6 @@
import datetime
import os
from typing import FrozenSet, Optional, Union
-from urllib.parse import quote
from sqlalchemy import func
@@ -28,6 +27,7 @@ from airflow.models import BaseOperatorLink, DagBag, DagModel, DagRun, TaskInsta
from airflow.operators.dummy_operator import DummyOperator
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
+from airflow.utils.helpers import build_airflow_url_with_query
from airflow.utils.session import provide_session
from airflow.utils.state import State
@@ -41,7 +41,8 @@ class ExternalTaskSensorLink(BaseOperatorLink):
name = 'External DAG'
def get_link(self, operator, dttm):
- return f"/graph?dag_id={operator.external_dag_id}&root=&execution_date={quote(dttm.isoformat())}"
+ query = {"dag_id": operator.external_dag_id, "execution_date": dttm.isoformat()}
+ return build_airflow_url_with_query(query)
class ExternalTaskSensor(BaseSensorOperator):
diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index 5ccb618..69ac5a0 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -22,9 +22,11 @@ from datetime import datetime
from functools import reduce
from itertools import filterfalse, tee
from typing import Any, Callable, Dict, Generator, Iterable, List, Optional, TypeVar
+from urllib import parse
from jinja2 import Template
+from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.utils.module_loading import import_string
@@ -202,3 +204,13 @@ def cross_downstream(*args, **kwargs):
stacklevel=2,
)
return import_string('airflow.models.baseoperator.cross_downstream')(*args, **kwargs)
+
+
+def build_airflow_url_with_query(query: Dict[str, Any]) -> str:
+ """
+ Build airflow url using base_url and default_view and provided query
+ For example:
+ 'http://0.0.0.0:8000/base/graph?dag_id=my-task&root=&execution_date=2020-10-27T10%3A59%3A25.615587
+ """
+ view = conf.get('webserver', 'dag_default_view').lower()
+ return f"/{view}?{parse.urlencode(query)}"
diff --git a/tests/utils/test_helpers.py b/tests/utils/test_helpers.py
index 85c53c5..c53aa4f 100644
--- a/tests/utils/test_helpers.py
+++ b/tests/utils/test_helpers.py
@@ -23,7 +23,8 @@ from airflow.models import TaskInstance
from airflow.models.dag import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils import helpers
-from airflow.utils.helpers import merge_dicts
+from airflow.utils.helpers import build_airflow_url_with_query, merge_dicts
+from tests.test_utils.config import conf_vars
class TestHelpers(unittest.TestCase):
@@ -136,3 +137,13 @@ class TestHelpers(unittest.TestCase):
dict2 = {'a': 1, 'r': {'c': 3, 'b': 0}}
merged = merge_dicts(dict1, dict2)
self.assertDictEqual(merged, {'a': 1, 'r': {'b': 0, 'c': 3}})
+
+ @conf_vars(
+ {
+ ("webserver", "dag_default_view"): "custom",
+ }
+ )
+ def test_build_airflow_url_with_query(self):
+ query = {"dag_id": "test_dag", "param": "key/to.encode"}
+ url = build_airflow_url_with_query(query)
+ assert url == "/custom?dag_id=test_dag¶m=key%2Fto.encode"