You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ur...@apache.org on 2022/08/05 03:41:14 UTC
[airflow] branch main updated: Make extra link work in UI (#25500)
This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d9e924c058 Make extra link work in UI (#25500)
d9e924c058 is described below
commit d9e924c058f5da9eba5bb5b85a04bfea6fb2471a
Author: Tzu-ping Chung <ur...@gmail.com>
AuthorDate: Fri Aug 5 11:41:07 2022 +0800
Make extra link work in UI (#25500)
---
airflow/models/abstractoperator.py | 29 ++++++++++++++++------
airflow/models/baseoperator.py | 17 +++++--------
airflow/operators/trigger_dagrun.py | 8 +-----
airflow/providers/qubole/operators/qubole.py | 3 +--
airflow/www/static/js/dag.js | 8 ++++--
.../static/js/dag/details/taskInstance/index.tsx | 14 ++++++-----
docs/spelling_wordlist.txt | 1 +
7 files changed, 44 insertions(+), 36 deletions(-)
diff --git a/airflow/models/abstractoperator.py b/airflow/models/abstractoperator.py
index c44371b721..50e234def7 100644
--- a/airflow/models/abstractoperator.py
+++ b/airflow/models/abstractoperator.py
@@ -28,11 +28,14 @@ from typing import (
FrozenSet,
Iterable,
List,
+ MutableMapping,
Optional,
Sequence,
Set,
+ Tuple,
Type,
Union,
+ cast,
)
from airflow.compat.functools import cached_property
@@ -284,6 +287,16 @@ class AbstractOperator(LoggingMixin, DAGNode):
def extra_links(self) -> List[str]:
return list(set(self.operator_extra_link_dict).union(self.global_operator_extra_link_dict))
+ def unmap(self, resolve: Union[None, Dict[str, Any], Tuple[Context, "Session"]]) -> "BaseOperator":
+ """Get the "normal" operator from current abstract operator.
+
+ MappedOperator uses this to unmap itself based on the map index. A non-
+ mapped operator (i.e. BaseOperator subclass) simply returns itself.
+
+ :meta private:
+ """
+ raise NotImplementedError()
+
def get_extra_links(self, ti: "TaskInstance", link_name: str) -> Optional[str]:
"""For an operator, gets the URLs that the ``extra_links`` entry points to.
@@ -300,13 +313,13 @@ class AbstractOperator(LoggingMixin, DAGNode):
link = self.global_operator_extra_link_dict.get(link_name)
if not link:
return None
- # Check for old function signature
+
parameters = inspect.signature(link.get_link).parameters
- args = [name for name, p in parameters.items() if p.kind != p.VAR_KEYWORD]
- if "ti_key" in args:
- return link.get_link(self, ti_key=ti.key)
- else:
- return link.get_link(self, ti.dag_run.logical_date) # type: ignore[misc]
+ old_signature = all(name != "ti_key" for name, p in parameters.items() if p.kind != p.VAR_KEYWORD)
+
+ if old_signature:
+ return link.get_link(self.unmap(None), ti.dag_run.logical_date) # type: ignore[misc]
+ return link.get_link(self.unmap(None), ti_key=ti.key)
def render_template_fields(
self,
@@ -401,8 +414,8 @@ class AbstractOperator(LoggingMixin, DAGNode):
template = jinja_env.from_string(value)
dag = self.get_dag()
if dag and dag.render_template_as_native_obj:
- return render_template_as_native(template, context)
- return render_template_to_string(template, context)
+ return render_template_as_native(template, cast(MutableMapping[str, Any], context))
+ return render_template_to_string(template, cast(MutableMapping[str, Any], context))
if isinstance(value, (DagParam, XComArg)):
return value.resolve(context)
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index f680c1afb0..395e1d3b67 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -1508,7 +1508,7 @@ class BaseOperator(AbstractOperator, metaclass=BaseOperatorMeta):
if cls.mapped_arguments_validated_by_init:
cls(**kwargs, _airflow_from_mapped=True, _airflow_mapped_validation_only=True)
- def unmap(self, ctx: Union[None, Dict[str, Any], Tuple[Context, Session]]) -> "BaseOperator":
+ def unmap(self, resolve: Union[None, Dict[str, Any], Tuple[Context, Session]]) -> "BaseOperator":
""":meta private:"""
return self
@@ -1762,21 +1762,16 @@ class BaseOperatorLink(metaclass=ABCMeta):
@property
@abstractmethod
def name(self) -> str:
- """
- Name of the link. This will be the button name on the task UI.
-
- :return: link name
- """
+ """Name of the link. This will be the button name on the task UI."""
@abstractmethod
- def get_link(self, operator: AbstractOperator, *, ti_key: "TaskInstanceKey") -> str:
- """
- Link to external system.
+ def get_link(self, operator: BaseOperator, *, ti_key: "TaskInstanceKey") -> str:
+ """Link to external system.
Note: The old signature of this function was ``(self, operator, dttm: datetime)``. That is still
supported at runtime but is deprecated.
- :param operator: airflow operator
- :param ti_key: TaskInstance ID to return link for
+ :param operator: The Airflow operator object this link is associated to.
+ :param ti_key: TaskInstance ID to return link for.
:return: link to external system
"""
diff --git a/airflow/operators/trigger_dagrun.py b/airflow/operators/trigger_dagrun.py
index 8045b5796a..9721fb7838 100644
--- a/airflow/operators/trigger_dagrun.py
+++ b/airflow/operators/trigger_dagrun.py
@@ -39,7 +39,6 @@ XCOM_RUN_ID = "trigger_run_id"
if TYPE_CHECKING:
- from airflow.models.abstractoperator import AbstractOperator
from airflow.models.taskinstance import TaskInstanceKey
@@ -51,12 +50,7 @@ class TriggerDagRunLink(BaseOperatorLink):
name = 'Triggered DAG'
- def get_link(
- self,
- operator: "AbstractOperator",
- *,
- ti_key: "TaskInstanceKey",
- ) -> str:
+ def get_link(self, operator: BaseOperator, *, ti_key: "TaskInstanceKey") -> str:
# Fetch the correct execution date for the triggerED dag which is
# stored in xcom during execution of the triggerING task.
when = XCom.get_value(ti_key=ti_key, key=XCOM_EXECUTION_DATE_ISO)
diff --git a/airflow/providers/qubole/operators/qubole.py b/airflow/providers/qubole/operators/qubole.py
index 838b5dff69..aa61792151 100644
--- a/airflow/providers/qubole/operators/qubole.py
+++ b/airflow/providers/qubole/operators/qubole.py
@@ -31,7 +31,6 @@ from airflow.providers.qubole.hooks.qubole import (
)
if TYPE_CHECKING:
- from airflow.models.abstractoperator import AbstractOperator
from airflow.models.taskinstance import TaskInstanceKey
from airflow.utils.context import Context
@@ -43,7 +42,7 @@ class QDSLink(BaseOperatorLink):
def get_link(
self,
- operator: "AbstractOperator",
+ operator: BaseOperator,
dttm: Optional[datetime] = None,
*,
ti_key: Optional["TaskInstanceKey"] = None,
diff --git a/airflow/www/static/js/dag.js b/airflow/www/static/js/dag.js
index 2a99a0af16..c6d7f4223f 100644
--- a/airflow/www/static/js/dag.js
+++ b/airflow/www/static/js/dag.js
@@ -55,6 +55,7 @@ let subdagId = '';
let dagRunId = '';
let mapIndex;
let mapStates = [];
+let extraLinks;
const showExternalLogRedirect = getMetaValue('show_external_log_redirect') === 'True';
const buttons = Array.from(document.querySelectorAll('a[id^="btn_"][data-base-url]')).reduce((obj, elm) => {
@@ -144,7 +145,7 @@ document.addEventListener('click', (event) => {
export function callModal({
taskId: t,
executionDate: d,
- extraLinks,
+ extraLinks: e,
tryNumber,
isSubDag,
dagRunId: drID,
@@ -160,6 +161,7 @@ export function callModal({
executionDate = d;
dagRunId = drID;
mapIndex = mi;
+ extraLinks = e;
if (isMapped) {
mapStates = mappedStates;
}
@@ -269,7 +271,7 @@ export function callModal({
}
query.delete('try_number');
- if (extraLinks && extraLinks.length > 0) {
+ if (!isMapped && extraLinks && extraLinks.length > 0) {
const markupArr = [];
extraLinks.sort();
$.each(extraLinks, (i, link) => {
@@ -318,6 +320,7 @@ $(document).on('click', '.map_index_item', function mapItem() {
taskId,
executionDate,
dagRunId,
+ extraLinks,
mapIndex: -1,
isMapped: true,
mappedStates: mapStates,
@@ -327,6 +330,7 @@ $(document).on('click', '.map_index_item', function mapItem() {
taskId,
executionDate,
dagRunId,
+ extraLinks,
mapIndex: mi,
});
}
diff --git a/airflow/www/static/js/dag/details/taskInstance/index.tsx b/airflow/www/static/js/dag/details/taskInstance/index.tsx
index aa4fc67490..a18eb0b9b8 100644
--- a/airflow/www/static/js/dag/details/taskInstance/index.tsx
+++ b/airflow/www/static/js/dag/details/taskInstance/index.tsx
@@ -164,12 +164,14 @@ const TaskInstance = ({ taskId, runId }: Props) => {
</Box>
)}
<Details instance={instance} group={group} />
- <ExtraLinks
- taskId={taskId}
- dagId={dagId || ''}
- executionDate={executionDate}
- extraLinks={group?.extraLinks || []}
- />
+ {!isMapped && (
+ <ExtraLinks
+ taskId={taskId}
+ dagId={dagId || ''}
+ executionDate={executionDate}
+ extraLinks={group?.extraLinks || []}
+ />
+ )}
{isMapped && taskId && (
<MappedInstances
dagId={dagId}
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 321ed5dcec..cb0efadb5c 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1520,6 +1520,7 @@ unittest
unittests
unix
unmanaged
+unmap
unmappable
unmapped
unmapping