You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by tu...@apache.org on 2020/10/03 15:20:54 UTC

[airflow] branch master updated: Add operator link to access DAG triggered by TriggerDagRunOperator (#11254)

This is an automated email from the ASF dual-hosted git repository.

turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new ff1ab97  Add operator link to access DAG triggered by TriggerDagRunOperator (#11254)
ff1ab97 is described below

commit ff1ab9714dc57de05a9a8695753f73e7ac18f074
Author: Tomek Urbaszek <tu...@gmail.com>
AuthorDate: Sat Oct 3 17:19:46 2020 +0200

    Add operator link to access DAG triggered by TriggerDagRunOperator (#11254)
    
    This commit adds TriggerDagRunLink which allows users to access
    easily access in Web UI a DAG triggered by TriggerDagRunOperator
---
 airflow/operators/dagrun_operator.py        | 22 +++++++++++++++++++++-
 airflow/serialization/serialized_objects.py |  3 ++-
 2 files changed, 23 insertions(+), 2 deletions(-)

diff --git a/airflow/operators/dagrun_operator.py b/airflow/operators/dagrun_operator.py
index 65fe4d5..19c8468 100644
--- a/airflow/operators/dagrun_operator.py
+++ b/airflow/operators/dagrun_operator.py
@@ -18,14 +18,27 @@
 
 import datetime
 from typing import Dict, Optional, Union
+from urllib.parse import quote
 
 from airflow.api.common.experimental.trigger_dag import trigger_dag
-from airflow.models import BaseOperator, DagRun
+from airflow.models import BaseOperator, BaseOperatorLink, DagRun
 from airflow.utils import timezone
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.types import DagRunType
 
 
+class TriggerDagRunLink(BaseOperatorLink):
+    """
+    Operator link for TriggerDagRunOperator. It allows users to access
+    DAG triggered by task using TriggerDagRunOperator.
+    """
+
+    name = 'Triggered DAG'
+
+    def get_link(self, operator, dttm):
+        return f"/graph?dag_id={operator.trigger_dag_id}&root=&execution_date={quote(dttm.isoformat())}"
+
+
 class TriggerDagRunOperator(BaseOperator):
     """
     Triggers a DAG run for a specified ``dag_id``
@@ -41,6 +54,13 @@ class TriggerDagRunOperator(BaseOperator):
     template_fields = ("trigger_dag_id", "execution_date", "conf")
     ui_color = "#ffefeb"
 
+    @property
+    def operator_extra_links(self):
+        """
+        Return operator extra links
+        """
+        return [TriggerDagRunLink()]
+
     @apply_defaults
     def __init__(
         self,
diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index 56c104a..fbfb0f2 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -53,7 +53,8 @@ BUILTIN_OPERATOR_EXTRA_LINKS: List[str] = [
     "airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleLink",
     "airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink",
     "airflow.providers.google.cloud.operators.mlengine.AIPlatformConsoleLink",
-    "airflow.providers.qubole.operators.qubole.QDSLink"
+    "airflow.providers.qubole.operators.qubole.QDSLink",
+    "airflow.operators.dagrun_operator.TriggerDagRunLink",
 ]