You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ds...@apache.org on 2022/07/22 17:35:42 UTC

[airflow] branch main updated: Add datasets to dag dependencies view (#25175)

This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 109b260920 Add datasets to dag dependencies view (#25175)
109b260920 is described below

commit 109b260920d816c5182445a2530a86d89593940f
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Fri Jul 22 10:35:26 2022 -0700

    Add datasets to dag dependencies view (#25175)
    
    Also, remove config option `scheduler > dependency_detector`.  See discussion in mailing list vote: https://lists.apache.org/thread/sd84gvxghocnjjgxz4p69qrb1t6k08z5
---
 airflow/config_templates/config.yml                |   6 --
 airflow/config_templates/default_airflow.cfg       |   3 -
 airflow/serialization/serialized_objects.py        | 102 ++++++++++++++++-----
 airflow/www/static/css/graph.css                   |   9 ++
 .../www/templates/airflow/dag_dependencies.html    |   1 +
 airflow/www/views.py                               |  24 ++---
 6 files changed, 102 insertions(+), 43 deletions(-)

diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index b2c1b4e846..13c60cd86b 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2121,12 +2121,6 @@
       type: string
       example: ~
       default: "False"
-    - name: dependency_detector
-      description: DAG dependency detector class to use
-      version_added: 2.1.0
-      type: string
-      example: ~
-      default: "airflow.serialization.serialized_objects.DependencyDetector"
     - name: trigger_timeout_check_interval
       description: |
         How often to check for expired trigger requests that have not run yet.
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index b90f242d41..cd2913b9da 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -1074,9 +1074,6 @@ use_job_schedule = True
 # Only has effect if schedule_interval is set to None in DAG
 allow_trigger_in_future = False
 
-# DAG dependency detector class to use
-dependency_detector = airflow.serialization.serialized_objects.DependencyDetector
-
 # How often to check for expired trigger requests that have not run yet.
 trigger_timeout_check_interval = 15
 
diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index 201d27b1c8..e24aa826a4 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -20,6 +20,7 @@
 import datetime
 import enum
 import logging
+import warnings
 import weakref
 from dataclasses import dataclass
 from inspect import Parameter, signature
@@ -554,27 +555,61 @@ class BaseSerialization:
 
 
 class DependencyDetector:
-    """Detects dependencies between DAGs."""
+    """
+    Detects dependencies between DAGs.
+
+    :meta private:
+    """
 
     @staticmethod
-    def detect_task_dependencies(task: Operator) -> Optional['DagDependency']:
+    def detect_task_dependencies(task: Operator) -> List['DagDependency']:
         """Detects dependencies caused by tasks"""
+        deps = []
         if isinstance(task, TriggerDagRunOperator):
-            return DagDependency(
-                source=task.dag_id,
-                target=getattr(task, "trigger_dag_id"),
-                dependency_type="trigger",
-                dependency_id=task.task_id,
+            deps.append(
+                DagDependency(
+                    source=task.dag_id,
+                    target=getattr(task, "trigger_dag_id"),
+                    dependency_type="trigger",
+                    dependency_id=task.task_id,
+                )
             )
         elif isinstance(task, ExternalTaskSensor):
-            return DagDependency(
-                source=getattr(task, "external_dag_id"),
-                target=task.dag_id,
-                dependency_type="sensor",
-                dependency_id=task.task_id,
+            deps.append(
+                DagDependency(
+                    source=getattr(task, "external_dag_id"),
+                    target=task.dag_id,
+                    dependency_type="sensor",
+                    dependency_id=task.task_id,
+                )
             )
+        for obj in getattr(task, '_outlets', []):
+            if isinstance(obj, Dataset):
+                deps.append(
+                    DagDependency(
+                        source=task.dag_id,
+                        target='dataset',
+                        dependency_type='dataset',
+                        dependency_id=obj.uri,
+                    )
+                )
+        return deps
 
-        return None
+    @staticmethod
+    def detect_dag_dependencies(dag: Optional[DAG]) -> List["DagDependency"]:
+        """Detects dependencies set directly on the DAG object."""
+        if dag and dag.schedule_on:
+            return [
+                DagDependency(
+                    source="dataset",
+                    target=dag.dag_id,
+                    dependency_type="dataset",
+                    dependency_id=x.uri,
+                )
+                for x in dag.schedule_on
+            ]
+        else:
+            return []
 
 
 class SerializedBaseOperator(BaseOperator, BaseSerialization):
@@ -592,8 +627,6 @@ class SerializedBaseOperator(BaseOperator, BaseSerialization):
         if v.default is not v.empty
     }
 
-    dependency_detector = conf.getimport('scheduler', 'dependency_detector')
-
     def __init__(self, *args, **kwargs):
         super().__init__(*args, **kwargs)
         # task_type is used by UI to display the correct class type, because UI only
@@ -837,9 +870,23 @@ class SerializedBaseOperator(BaseOperator, BaseSerialization):
         return op
 
     @classmethod
-    def detect_dependencies(cls, op: Operator) -> Optional['DagDependency']:
+    def detect_dependencies(cls, op: Operator) -> Set['DagDependency']:
         """Detects between DAG dependencies for the operator."""
-        return cls.dependency_detector.detect_task_dependencies(op)
+        dependency_detector = DependencyDetector()
+        custom_dependency_detector = conf.getimport('scheduler', 'dependency_detector', fallback=None)
+        deps = set()
+        if not (custom_dependency_detector is None or type(dependency_detector) is DependencyDetector):
+            warnings.warn(
+                "Use of a custom dependency detector is deprecated. "
+                "Support will be removed in a future release.",
+                DeprecationWarning,
+            )
+            dep = custom_dependency_detector.detect_task_dependencies(op)
+            if type(dep) is DagDependency:
+                deps.add(dep)
+        deps.update(dependency_detector.detect_task_dependencies(op))
+        deps.update(dependency_detector.detect_dag_dependencies(op.dag))
+        return deps
 
     @classmethod
     def _is_excluded(cls, var: Any, attrname: str, op: "DAGNode"):
@@ -1010,11 +1057,14 @@ class SerializedDAG(DAG, BaseSerialization):
                 del serialized_dag["timetable"]
 
             serialized_dag["tasks"] = [cls._serialize(task) for _, task in dag.task_dict.items()]
-            serialized_dag["dag_dependencies"] = [
-                vars(t)
-                for t in (SerializedBaseOperator.detect_dependencies(task) for task in dag.task_dict.values())
+            dag_deps = [
+                t.__dict__
+                for task in dag.task_dict.values()
+                for t in SerializedBaseOperator.detect_dependencies(task)
                 if t is not None
             ]
+
+            serialized_dag["dag_dependencies"] = dag_deps
             serialized_dag['_task_group'] = SerializedTaskGroup.serialize_task_group(dag.task_group)
 
             # Edge info in the JSON exactly matches our internal structure
@@ -1208,12 +1258,20 @@ class DagDependency:
     source: str
     target: str
     dependency_type: str
-    dependency_id: str
+    dependency_id: Optional[str] = None
 
     @property
     def node_id(self):
         """Node ID for graph rendering"""
-        return f"{self.dependency_type}:{self.source}:{self.target}:{self.dependency_id}"
+        val = f"{self.dependency_type}"
+        if not self.dependency_type == 'dataset':
+            val += f":{self.source}:{self.target}"
+        if self.dependency_id:
+            val += f":{self.dependency_id}"
+        return val
+
+    def __hash__(self):
+        return hash((self.source, self.target, self.dependency_type, self.dependency_id))
 
 
 def _has_kubernetes() -> bool:
diff --git a/airflow/www/static/css/graph.css b/airflow/www/static/css/graph.css
index 5d30cb43e4..5dc33fe2f9 100644
--- a/airflow/www/static/css/graph.css
+++ b/airflow/www/static/css/graph.css
@@ -161,6 +161,11 @@ g.node text {
   background-color: #e6f1f2;
 }
 
+.legend-item.dataset {
+  float: left;
+  background-color: #fcecd4;
+}
+
 g.node.dag rect {
   fill: #e8f7e4;
 }
@@ -172,3 +177,7 @@ g.node.trigger rect {
 g.node.sensor rect {
   fill: #e6f1f2;
 }
+
+g.node.dataset rect {
+  fill: #fcecd4;
+}
diff --git a/airflow/www/templates/airflow/dag_dependencies.html b/airflow/www/templates/airflow/dag_dependencies.html
index c893c75bd0..3d25e1ce74 100644
--- a/airflow/www/templates/airflow/dag_dependencies.html
+++ b/airflow/www/templates/airflow/dag_dependencies.html
@@ -43,6 +43,7 @@ under the License.
     <span class="legend-item dag">dag</span>
     <span class="legend-item trigger">trigger</span>
     <span class="legend-item sensor">sensor</span>
+    <span class="legend-item dataset">dataset</span>
   </div>
   <div style="float:right">Last refresh: <time datetime="{{ last_refresh }}">{{ last_refresh }}</time></div>
 </div>
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 8681127f46..52540204b9 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -5320,24 +5320,24 @@ class DagDependenciesView(AirflowBaseView):
 
     def _calculate_graph(self):
 
-        nodes: List[Dict[str, Any]] = []
-        edges: List[Dict[str, str]] = []
+        nodes_dict: Dict[str, Any] = {}
+        edge_tuples: Set[Dict[str, str]] = set()
 
         for dag, dependencies in SerializedDagModel.get_dag_dependencies().items():
             dag_node_id = f"dag:{dag}"
-            nodes.append(self._node_dict(dag_node_id, dag, "dag"))
+            if dag_node_id not in nodes_dict:
+                nodes_dict[dag_node_id] = self._node_dict(dag_node_id, dag, "dag")
 
             for dep in dependencies:
-                nodes.append(self._node_dict(dep.node_id, dep.dependency_id, dep.dependency_type))
-                edges.extend(
-                    [
-                        {"u": f"dag:{dep.source}", "v": dep.node_id},
-                        {"u": dep.node_id, "v": f"dag:{dep.target}"},
-                    ]
-                )
+                if dep.node_id not in nodes_dict:
+                    nodes_dict[dep.node_id] = self._node_dict(
+                        dep.node_id, dep.dependency_id, dep.dependency_type
+                    )
+                edge_tuples.add((f"dag:{dep.source}", dep.node_id))
+                edge_tuples.add((dep.node_id, f"dag:{dep.target}"))
 
-        self.nodes = nodes
-        self.edges = edges
+        self.nodes = list(nodes_dict.values())
+        self.edges = [{"u": u, "v": v} for u, v in edge_tuples]
 
     @staticmethod
     def _node_dict(node_id, label, node_class):