You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/02/28 13:35:28 UTC

[GitHub] [airflow] uranusjr commented on a change in pull request #21798: Change BaseOperatorLink interface to take a ti_key, not a datetime

uranusjr commented on a change in pull request #21798:
URL: https://github.com/apache/airflow/pull/21798#discussion_r815885054



##########
File path: airflow/models/abstractoperator.py
##########
@@ -239,19 +242,27 @@ def global_operator_extra_link_dict(self) -> Dict[str, Any]:
     def extra_links(self) -> List[str]:
         return list(set(self.operator_extra_link_dict).union(self.global_operator_extra_link_dict))
 
-    def get_extra_links(self, dttm: datetime.datetime, link_name: str) -> Optional[Dict[str, Any]]:
+    def get_extra_links(self, ti: "TaskInstance", link_name: str) -> Optional[str]:

Review comment:
       Does this need to be considered public API?

##########
File path: airflow/models/abstractoperator.py
##########
@@ -394,3 +405,7 @@ def _render_nested_template_fields(
             # content has no inner template fields
             return
         self._do_render_template_fields(value, nested_template_fields, context, jinja_env, seen_oids)
+
+
+if TYPE_CHECKING:
+    from airflow.models.baseoperator import BaseOperator, BaseOperatorLink

Review comment:
       Why are these not at the top?

##########
File path: airflow/models/abstractoperator.py
##########
@@ -239,19 +242,27 @@ def global_operator_extra_link_dict(self) -> Dict[str, Any]:
     def extra_links(self) -> List[str]:
         return list(set(self.operator_extra_link_dict).union(self.global_operator_extra_link_dict))
 
-    def get_extra_links(self, dttm: datetime.datetime, link_name: str) -> Optional[Dict[str, Any]]:
+    def get_extra_links(self, ti: "TaskInstance", link_name: str) -> Optional[str]:
         """For an operator, gets the URLs that the ``extra_links`` entry points to.
 
         :raise ValueError: The error message of a ValueError will be passed on through to
             the fronted to show up as a tooltip on the disabled link.
-        :param dttm: The datetime parsed execution date for the URL being searched for.
+        :param ti: The TaskInstance for the URL being searched for.
         :param link_name: The name of the link we're looking for the URL for. Should be
             one of the options specified in ``extra_links``.
         """
-        if link_name in self.operator_extra_link_dict:
-            return self.operator_extra_link_dict[link_name].get_link(self, dttm)
-        elif link_name in self.global_operator_extra_link_dict:
-            return self.global_operator_extra_link_dict[link_name].get_link(self, dttm)
+        link: Optional["BaseOperatorLink"] = self.operator_extra_link_dict.get(
+            link_name
+        ) or self.global_operator_extra_link_dict.get(link_name)

Review comment:
       ```suggestion
           link: Optional["BaseOperatorLink"] = (
               self.operator_extra_link_dict.get(link_name)
               or self.global_operator_extra_link_dict.get(link_name)
           )
   ```
   
   Would Black be happy with this? The current format is… bad…

##########
File path: airflow/models/baseoperator.py
##########
@@ -1730,11 +1731,17 @@ def name(self) -> str:
         """
 
     @abstractmethod
-    def get_link(self, operator: BaseOperator, dttm: datetime) -> str:
+    def get_link(
+        self,
+        operator: AbstractOperator,
+        dttm: Optional[datetime] = None,
+        *,
+        ti_key: Optional["TaskInstanceKey"] = None,
+    ) -> str:

Review comment:
       `@overload` this?

##########
File path: airflow/models/abstractoperator.py
##########
@@ -239,19 +242,27 @@ def global_operator_extra_link_dict(self) -> Dict[str, Any]:
     def extra_links(self) -> List[str]:
         return list(set(self.operator_extra_link_dict).union(self.global_operator_extra_link_dict))
 
-    def get_extra_links(self, dttm: datetime.datetime, link_name: str) -> Optional[Dict[str, Any]]:
+    def get_extra_links(self, ti: "TaskInstance", link_name: str) -> Optional[str]:
         """For an operator, gets the URLs that the ``extra_links`` entry points to.
 
         :raise ValueError: The error message of a ValueError will be passed on through to
             the fronted to show up as a tooltip on the disabled link.
-        :param dttm: The datetime parsed execution date for the URL being searched for.
+        :param ti: The TaskInstance for the URL being searched for.
         :param link_name: The name of the link we're looking for the URL for. Should be
             one of the options specified in ``extra_links``.
         """
-        if link_name in self.operator_extra_link_dict:
-            return self.operator_extra_link_dict[link_name].get_link(self, dttm)
-        elif link_name in self.global_operator_extra_link_dict:
-            return self.global_operator_extra_link_dict[link_name].get_link(self, dttm)
+        link: Optional["BaseOperatorLink"] = self.operator_extra_link_dict.get(
+            link_name
+        ) or self.global_operator_extra_link_dict.get(link_name)
+        if not link:
+            return None
+        # Check for old function signature
+        parameters = inspect.signature(link.get_link).parameters
+        args = [name for name, p in parameters.items() if p.kind != p.VAR_KEYWORD]
+        if "ti_key" in args:

Review comment:
       I think this should be good enough. Can’t think of all the future possibilities but this should be strict enough, if we always _call_ `get_link` with keyword `ti_key` (we already do here).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org