You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ur...@apache.org on 2022/08/05 03:41:14 UTC

[airflow] branch main updated: Make extra link work in UI (#25500)

This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d9e924c058 Make extra link work in UI (#25500)
d9e924c058 is described below

commit d9e924c058f5da9eba5bb5b85a04bfea6fb2471a
Author: Tzu-ping Chung <ur...@gmail.com>
AuthorDate: Fri Aug 5 11:41:07 2022 +0800

    Make extra link work in UI (#25500)
---
 airflow/models/abstractoperator.py                 | 29 ++++++++++++++++------
 airflow/models/baseoperator.py                     | 17 +++++--------
 airflow/operators/trigger_dagrun.py                |  8 +-----
 airflow/providers/qubole/operators/qubole.py       |  3 +--
 airflow/www/static/js/dag.js                       |  8 ++++--
 .../static/js/dag/details/taskInstance/index.tsx   | 14 ++++++-----
 docs/spelling_wordlist.txt                         |  1 +
 7 files changed, 44 insertions(+), 36 deletions(-)

diff --git a/airflow/models/abstractoperator.py b/airflow/models/abstractoperator.py
index c44371b721..50e234def7 100644
--- a/airflow/models/abstractoperator.py
+++ b/airflow/models/abstractoperator.py
@@ -28,11 +28,14 @@ from typing import (
     FrozenSet,
     Iterable,
     List,
+    MutableMapping,
     Optional,
     Sequence,
     Set,
+    Tuple,
     Type,
     Union,
+    cast,
 )
 
 from airflow.compat.functools import cached_property
@@ -284,6 +287,16 @@ class AbstractOperator(LoggingMixin, DAGNode):
     def extra_links(self) -> List[str]:
         return list(set(self.operator_extra_link_dict).union(self.global_operator_extra_link_dict))
 
+    def unmap(self, resolve: Union[None, Dict[str, Any], Tuple[Context, "Session"]]) -> "BaseOperator":
+        """Get the "normal" operator from current abstract operator.
+
+        MappedOperator uses this to unmap itself based on the map index. A non-
+        mapped operator (i.e. BaseOperator subclass) simply returns itself.
+
+        :meta private:
+        """
+        raise NotImplementedError()
+
     def get_extra_links(self, ti: "TaskInstance", link_name: str) -> Optional[str]:
         """For an operator, gets the URLs that the ``extra_links`` entry points to.
 
@@ -300,13 +313,13 @@ class AbstractOperator(LoggingMixin, DAGNode):
             link = self.global_operator_extra_link_dict.get(link_name)
             if not link:
                 return None
-        # Check for old function signature
+
         parameters = inspect.signature(link.get_link).parameters
-        args = [name for name, p in parameters.items() if p.kind != p.VAR_KEYWORD]
-        if "ti_key" in args:
-            return link.get_link(self, ti_key=ti.key)
-        else:
-            return link.get_link(self, ti.dag_run.logical_date)  # type: ignore[misc]
+        old_signature = all(name != "ti_key" for name, p in parameters.items() if p.kind != p.VAR_KEYWORD)
+
+        if old_signature:
+            return link.get_link(self.unmap(None), ti.dag_run.logical_date)  # type: ignore[misc]
+        return link.get_link(self.unmap(None), ti_key=ti.key)
 
     def render_template_fields(
         self,
@@ -401,8 +414,8 @@ class AbstractOperator(LoggingMixin, DAGNode):
                 template = jinja_env.from_string(value)
             dag = self.get_dag()
             if dag and dag.render_template_as_native_obj:
-                return render_template_as_native(template, context)
-            return render_template_to_string(template, context)
+                return render_template_as_native(template, cast(MutableMapping[str, Any], context))
+            return render_template_to_string(template, cast(MutableMapping[str, Any], context))
 
         if isinstance(value, (DagParam, XComArg)):
             return value.resolve(context)
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index f680c1afb0..395e1d3b67 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -1508,7 +1508,7 @@ class BaseOperator(AbstractOperator, metaclass=BaseOperatorMeta):
         if cls.mapped_arguments_validated_by_init:
             cls(**kwargs, _airflow_from_mapped=True, _airflow_mapped_validation_only=True)
 
-    def unmap(self, ctx: Union[None, Dict[str, Any], Tuple[Context, Session]]) -> "BaseOperator":
+    def unmap(self, resolve: Union[None, Dict[str, Any], Tuple[Context, Session]]) -> "BaseOperator":
         """:meta private:"""
         return self
 
@@ -1762,21 +1762,16 @@ class BaseOperatorLink(metaclass=ABCMeta):
     @property
     @abstractmethod
     def name(self) -> str:
-        """
-        Name of the link. This will be the button name on the task UI.
-
-        :return: link name
-        """
+        """Name of the link. This will be the button name on the task UI."""
 
     @abstractmethod
-    def get_link(self, operator: AbstractOperator, *, ti_key: "TaskInstanceKey") -> str:
-        """
-        Link to external system.
+    def get_link(self, operator: BaseOperator, *, ti_key: "TaskInstanceKey") -> str:
+        """Link to external system.
 
         Note: The old signature of this function was ``(self, operator, dttm: datetime)``. That is still
         supported at runtime but is deprecated.
 
-        :param operator: airflow operator
-        :param ti_key: TaskInstance ID to return link for
+        :param operator: The Airflow operator object this link is associated to.
+        :param ti_key: TaskInstance ID to return link for.
         :return: link to external system
         """
diff --git a/airflow/operators/trigger_dagrun.py b/airflow/operators/trigger_dagrun.py
index 8045b5796a..9721fb7838 100644
--- a/airflow/operators/trigger_dagrun.py
+++ b/airflow/operators/trigger_dagrun.py
@@ -39,7 +39,6 @@ XCOM_RUN_ID = "trigger_run_id"
 
 
 if TYPE_CHECKING:
-    from airflow.models.abstractoperator import AbstractOperator
     from airflow.models.taskinstance import TaskInstanceKey
 
 
@@ -51,12 +50,7 @@ class TriggerDagRunLink(BaseOperatorLink):
 
     name = 'Triggered DAG'
 
-    def get_link(
-        self,
-        operator: "AbstractOperator",
-        *,
-        ti_key: "TaskInstanceKey",
-    ) -> str:
+    def get_link(self, operator: BaseOperator, *, ti_key: "TaskInstanceKey") -> str:
         # Fetch the correct execution date for the triggerED dag which is
         # stored in xcom during execution of the triggerING task.
         when = XCom.get_value(ti_key=ti_key, key=XCOM_EXECUTION_DATE_ISO)
diff --git a/airflow/providers/qubole/operators/qubole.py b/airflow/providers/qubole/operators/qubole.py
index 838b5dff69..aa61792151 100644
--- a/airflow/providers/qubole/operators/qubole.py
+++ b/airflow/providers/qubole/operators/qubole.py
@@ -31,7 +31,6 @@ from airflow.providers.qubole.hooks.qubole import (
 )
 
 if TYPE_CHECKING:
-    from airflow.models.abstractoperator import AbstractOperator
     from airflow.models.taskinstance import TaskInstanceKey
     from airflow.utils.context import Context
 
@@ -43,7 +42,7 @@ class QDSLink(BaseOperatorLink):
 
     def get_link(
         self,
-        operator: "AbstractOperator",
+        operator: BaseOperator,
         dttm: Optional[datetime] = None,
         *,
         ti_key: Optional["TaskInstanceKey"] = None,
diff --git a/airflow/www/static/js/dag.js b/airflow/www/static/js/dag.js
index 2a99a0af16..c6d7f4223f 100644
--- a/airflow/www/static/js/dag.js
+++ b/airflow/www/static/js/dag.js
@@ -55,6 +55,7 @@ let subdagId = '';
 let dagRunId = '';
 let mapIndex;
 let mapStates = [];
+let extraLinks;
 const showExternalLogRedirect = getMetaValue('show_external_log_redirect') === 'True';
 
 const buttons = Array.from(document.querySelectorAll('a[id^="btn_"][data-base-url]')).reduce((obj, elm) => {
@@ -144,7 +145,7 @@ document.addEventListener('click', (event) => {
 export function callModal({
   taskId: t,
   executionDate: d,
-  extraLinks,
+  extraLinks: e,
   tryNumber,
   isSubDag,
   dagRunId: drID,
@@ -160,6 +161,7 @@ export function callModal({
   executionDate = d;
   dagRunId = drID;
   mapIndex = mi;
+  extraLinks = e;
   if (isMapped) {
     mapStates = mappedStates;
   }
@@ -269,7 +271,7 @@ export function callModal({
   }
   query.delete('try_number');
 
-  if (extraLinks && extraLinks.length > 0) {
+  if (!isMapped && extraLinks && extraLinks.length > 0) {
     const markupArr = [];
     extraLinks.sort();
     $.each(extraLinks, (i, link) => {
@@ -318,6 +320,7 @@ $(document).on('click', '.map_index_item', function mapItem() {
       taskId,
       executionDate,
       dagRunId,
+      extraLinks,
       mapIndex: -1,
       isMapped: true,
       mappedStates: mapStates,
@@ -327,6 +330,7 @@ $(document).on('click', '.map_index_item', function mapItem() {
       taskId,
       executionDate,
       dagRunId,
+      extraLinks,
       mapIndex: mi,
     });
   }
diff --git a/airflow/www/static/js/dag/details/taskInstance/index.tsx b/airflow/www/static/js/dag/details/taskInstance/index.tsx
index aa4fc67490..a18eb0b9b8 100644
--- a/airflow/www/static/js/dag/details/taskInstance/index.tsx
+++ b/airflow/www/static/js/dag/details/taskInstance/index.tsx
@@ -164,12 +164,14 @@ const TaskInstance = ({ taskId, runId }: Props) => {
                 </Box>
               )}
               <Details instance={instance} group={group} />
-              <ExtraLinks
-                taskId={taskId}
-                dagId={dagId || ''}
-                executionDate={executionDate}
-                extraLinks={group?.extraLinks || []}
-              />
+              {!isMapped && (
+                <ExtraLinks
+                  taskId={taskId}
+                  dagId={dagId || ''}
+                  executionDate={executionDate}
+                  extraLinks={group?.extraLinks || []}
+                />
+              )}
               {isMapped && taskId && (
                 <MappedInstances
                   dagId={dagId}
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 321ed5dcec..cb0efadb5c 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1520,6 +1520,7 @@ unittest
 unittests
 unix
 unmanaged
+unmap
 unmappable
 unmapped
 unmapping