You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/05/07 16:16:47 UTC

[airflow] branch master updated: Create cross-DAG dependencies view (#13199)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new db0e942  Create cross-DAG dependencies view (#13199)
db0e942 is described below

commit db0e9425c6319d71c87554e0382b2f3852a7a10f
Author: Marcin SzymaƄski <ms...@gmail.com>
AuthorDate: Fri May 7 17:16:27 2021 +0100

    Create cross-DAG dependencies view (#13199)
    
    This adds a new view for displaying dependencies between DAGs. It's based on the
    DAG Dependencies plugin (https://github.com/ms32035/airflow-dag-dependencies)
    and has been updated to work with Airflow 2.0.
    
    Since quite a bit of code is common with DAG graph view, that has been
    externalized to a new module.
    
    Unlike the external plugin this is uses the DAG serializater to store
    dependencies at parse time, meaning it doesn't need to load all DAGs.
    
    Co-authored-by: Brent Bovenzi <br...@gmail.com>
    Co-authored-by: Ash Berlin-Taylor <as...@firemirror.com>
---
 airflow/models/serialized_dag.py                   |  37 +++-
 airflow/security/permissions.py                    |   1 +
 airflow/serialization/schema.json                  |   9 +-
 airflow/serialization/serialized_objects.py        |  57 +++++-
 airflow/www/extensions/init_views.py               |   5 +
 airflow/www/security.py                            |   1 +
 airflow/www/static/css/graph.css                   |  27 +++
 airflow/www/static/js/dag_dependencies.js          | 195 +++++++++++++++++++++
 .../www/templates/airflow/dag_dependencies.html    |  75 ++++++++
 airflow/www/views.py                               |  73 ++++++++
 airflow/www/webpack.config.js                      |   1 +
 docs/apache-airflow/concepts/dags.rst              |  18 ++
 setup.cfg                                          |   1 +
 tests/serialization/test_dag_serialization.py      |   9 +-
 tests/www/test_security.py                         |   1 +
 tests/www/test_views.py                            |   8 +-
 16 files changed, 512 insertions(+), 6 deletions(-)

diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py
index c2706f4..81448b6 100644
--- a/airflow/models/serialized_dag.py
+++ b/airflow/models/serialized_dag.py
@@ -27,12 +27,13 @@ import sqlalchemy_jsonfield
 from sqlalchemy import BigInteger, Column, Index, String, and_
 from sqlalchemy.orm import Session, backref, foreign, relationship
 from sqlalchemy.sql import exists
+from sqlalchemy.sql.expression import func
 
 from airflow.models.base import ID_LEN, Base
 from airflow.models.dag import DAG, DagModel
 from airflow.models.dagcode import DagCode
 from airflow.models.dagrun import DagRun
-from airflow.serialization.serialized_objects import SerializedDAG
+from airflow.serialization.serialized_objects import DagDependency, SerializedDAG
 from airflow.settings import MIN_SERIALIZED_DAG_UPDATE_INTERVAL, json
 from airflow.utils import timezone
 from airflow.utils.session import provide_session
@@ -274,6 +275,17 @@ class SerializedDagModel(Base):
 
     @classmethod
     @provide_session
+    def get_max_last_updated_datetime(cls, session: Session = None) -> datetime:
+        """
+        Get the maximum date when any DAG was last updated in serialized_dag table
+
+        :param session: ORM Session
+        :type session: Session
+        """
+        return session.query(func.max(cls.last_updated)).scalar()
+
+    @classmethod
+    @provide_session
     def get_latest_version_hash(cls, dag_id: str, session: Session = None) -> str:
         """
         Get the latest DAG version for a given DAG ID.
@@ -286,3 +298,26 @@ class SerializedDagModel(Base):
         :rtype: str
         """
         return session.query(cls.dag_hash).filter(cls.dag_id == dag_id).scalar()
+
+    @classmethod
+    @provide_session
+    def get_dag_dependencies(cls, session: Session = None) -> Dict[str, List['DagDependency']]:
+        """
+        Get the dependencies between DAGs
+
+        :param session: ORM Session
+        :type session: Session
+        """
+        dependencies = {}
+
+        if session.bind.dialect.name in ["sqlite", "mysql"]:
+            for row in session.query(cls.dag_id, func.json_extract(cls.data, "$.dag.dag_dependencies")).all():
+                dependencies[row[0]] = [DagDependency(**d) for d in json.loads(row[1])]
+
+        else:
+            for row in session.query(
+                cls.dag_id, func.json_extract_path(cls.data, "dag", "dag_dependencies")
+            ).all():
+                dependencies[row[0]] = [DagDependency(**d) for d in row[1]]
+
+        return dependencies
diff --git a/airflow/security/permissions.py b/airflow/security/permissions.py
index 115f25a..ae864a0 100644
--- a/airflow/security/permissions.py
+++ b/airflow/security/permissions.py
@@ -27,6 +27,7 @@ RESOURCE_DOCS_MENU = "Docs"
 RESOURCE_DOCS = "Documentation"
 RESOURCE_CONFIG = "Configurations"
 RESOURCE_CONNECTION = "Connections"
+RESOURCE_DAG_DEPENDENCIES = "DAG Dependencies"
 RESOURCE_DAG_CODE = "DAG Code"
 RESOURCE_DAG_RUN = "DAG Runs"
 RESOURCE_IMPORT_ERROR = "ImportError"
diff --git a/airflow/serialization/schema.json b/airflow/serialization/schema.json
index fc79238..e87c4aa 100644
--- a/airflow/serialization/schema.json
+++ b/airflow/serialization/schema.json
@@ -66,6 +66,12 @@
         "maxProperties": 1
       }
     },
+    "dag_dependencies": {
+      "type": "array",
+      "items": {
+        "type": "object"
+      }
+    },
     "dag": {
       "type": "object",
       "properties": {
@@ -104,7 +110,8 @@
           { "type": "null" },
           { "$ref": "#/definitions/task_group" }
         ]},
-        "edge_info": { "$ref": "#/definitions/edge_info" }
+        "edge_info": { "$ref": "#/definitions/edge_info" },
+        "dag_dependencies": { "$ref": "#/definitions/dag_dependencies" }
       },
       "required": [
         "_dag_id",
diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index 645844a..9059728 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -19,6 +19,7 @@
 import datetime
 import enum
 import logging
+from dataclasses import dataclass
 from inspect import Parameter, signature
 from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Union
 
@@ -302,8 +303,7 @@ class BaseSerialization:
         elif type_ == DAT.SET:
             return {cls._deserialize(v) for v in var}
         elif type_ == DAT.TUPLE:
-            # pylint: disable=consider-using-generator
-            return tuple([cls._deserialize(v) for v in var])
+            return tuple(cls._deserialize(v) for v in var)
         else:
             raise TypeError(f'Invalid type {type_!s} in deserialization.')
 
@@ -344,6 +344,30 @@ class BaseSerialization:
         return False
 
 
+class DependencyDetector:
+    """Detects dependencies between DAGs."""
+
+    @staticmethod
+    def detect_task_dependencies(task: BaseOperator) -> Optional['DagDependency']:
+        """Detects dependencies caused by tasks"""
+        if task.task_type == "TriggerDagRunOperator":
+            return DagDependency(
+                source=task.dag_id,
+                target=getattr(task, "trigger_dag_id"),
+                dependency_type="trigger",
+                dependency_id=task.task_id,
+            )
+        elif task.task_type == "ExternalTaskSensor":
+            return DagDependency(
+                source=getattr(task, "external_dag_id"),
+                target=task.dag_id,
+                dependency_type="sensor",
+                dependency_id=task.task_id,
+            )
+
+        return None
+
+
 class SerializedBaseOperator(BaseOperator, BaseSerialization):
     """A JSON serializable representation of operator.
 
@@ -359,6 +383,8 @@ class SerializedBaseOperator(BaseOperator, BaseSerialization):
         if v.default is not v.empty
     }
 
+    dependency_detector = DependencyDetector
+
     def __init__(self, *args, **kwargs):
         super().__init__(*args, **kwargs)
         # task_type is used by UI to display the correct class type, because UI only
@@ -504,6 +530,11 @@ class SerializedBaseOperator(BaseOperator, BaseSerialization):
         return op
 
     @classmethod
+    def detect_dependencies(cls, op: BaseOperator) -> Optional['DagDependency']:
+        """Detects between DAG dependencies for the operator."""
+        return cls.dependency_detector.detect_task_dependencies(op)
+
+    @classmethod
     def _is_excluded(cls, var: Any, attrname: str, op: BaseOperator):
         if var is not None and op.has_dag() and attrname.endswith("_date"):
             # If this date is the same as the matching field in the dag, then
@@ -657,6 +688,11 @@ class SerializedDAG(DAG, BaseSerialization):
             serialize_dag = cls.serialize_to_json(dag, cls._decorated_fields)
 
             serialize_dag["tasks"] = [cls._serialize(task) for _, task in dag.task_dict.items()]
+            serialize_dag["dag_dependencies"] = [
+                vars(t)
+                for t in (SerializedBaseOperator.detect_dependencies(task) for task in dag.task_dict.values())
+                if t is not None
+            ]
             serialize_dag['_task_group'] = SerializedTaskGroup.serialize_task_group(dag.task_group)
 
             # Edge info in the JSON exactly matches our internal structure
@@ -821,3 +857,20 @@ class SerializedTaskGroup(TaskGroup, BaseSerialization):
         group.upstream_task_ids = set(cls._deserialize(encoded_group["upstream_task_ids"]))
         group.downstream_task_ids = set(cls._deserialize(encoded_group["downstream_task_ids"]))
         return group
+
+
+@dataclass
+class DagDependency:
+    """Dataclass for representing dependencies between DAGs.
+    These are calculated during serialization and attached to serialized DAGs.
+    """
+
+    source: str
+    target: str
+    dependency_type: str
+    dependency_id: str
+
+    @property
+    def node_id(self):
+        """Node ID for graph rendering"""
+        return f"{self.dependency_type}:{self.source}:{self.target}:{self.dependency_id}"
diff --git a/airflow/www/extensions/init_views.py b/airflow/www/extensions/init_views.py
index 14e532b..b6bebfd 100644
--- a/airflow/www/extensions/init_views.py
+++ b/airflow/www/extensions/init_views.py
@@ -97,6 +97,11 @@ def init_appbuilder_views(app):
     appbuilder.add_view(
         views.XComModelView, permissions.RESOURCE_XCOM, category=permissions.RESOURCE_ADMIN_MENU
     )
+    appbuilder.add_view(
+        views.DagDependenciesView,
+        permissions.RESOURCE_DAG_DEPENDENCIES,
+        category=permissions.RESOURCE_BROWSE_MENU,
+    )
     # add_view_no_menu to change item position.
     # I added link in extensions.init_appbuilder_links.init_appbuilder_links
     appbuilder.add_view_no_menu(views.RedocView)
diff --git a/airflow/www/security.py b/airflow/www/security.py
index 0cb7b40..8768587 100644
--- a/airflow/www/security.py
+++ b/airflow/www/security.py
@@ -69,6 +69,7 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):  # pylint: disable=
     VIEWER_PERMISSIONS = [
         (permissions.ACTION_CAN_READ, permissions.RESOURCE_AUDIT_LOG),
         (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_DEPENDENCIES),
         (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_CODE),
         (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
         (permissions.ACTION_CAN_READ, permissions.RESOURCE_IMPORT_ERROR),
diff --git a/airflow/www/static/css/graph.css b/airflow/www/static/css/graph.css
index 6259d45..f4c7b94 100644
--- a/airflow/www/static/css/graph.css
+++ b/airflow/www/static/css/graph.css
@@ -165,3 +165,30 @@ g.node.removed rect {
   background-color: #f0f0f0;
   cursor: move;
 }
+
+.legend-item.dag {
+  float: left;
+  background-color: #e8f7e4;
+}
+
+.legend-item.trigger {
+  float: left;
+  background-color: #ffefeb;
+}
+
+.legend-item.sensor {
+  float: left;
+  background-color: #e6f1f2;
+}
+
+g.node.dag rect {
+  fill: #e8f7e4;
+}
+
+g.node.trigger rect {
+  fill: #ffefeb;
+}
+
+g.node.sensor rect {
+  fill: #e6f1f2;
+}
diff --git a/airflow/www/static/js/dag_dependencies.js b/airflow/www/static/js/dag_dependencies.js
new file mode 100644
index 0000000..786d337
--- /dev/null
+++ b/airflow/www/static/js/dag_dependencies.js
@@ -0,0 +1,195 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+  global d3, localStorage, dagreD3, nodes, edges, arrange, document,
+*/
+
+const highlightColor = '#000000';
+const upstreamColor = '#2020A0';
+const downstreamColor = '#0000FF';
+const initialStrokeWidth = '3px';
+const highlightStrokeWidth = '5px';
+const duration = 500;
+
+// Preparation of DagreD3 data structures
+const g = new dagreD3.graphlib.Graph()
+  .setGraph({
+    nodesep: 15,
+    ranksep: 15,
+    rankdir: arrange,
+  })
+  .setDefaultEdgeLabel(() => ({ lineInterpolate: 'basis' }));
+
+// Set all nodes and styles
+nodes.forEach((node) => {
+  g.setNode(node.id, node.value);
+});
+
+// Set edges
+edges.forEach((edge) => {
+  g.setEdge(edge.u, edge.v);
+});
+
+const render = dagreD3.render();
+const svg = d3.select('#graph-svg');
+const innerSvg = d3.select('#graph-svg g');
+
+innerSvg.call(render, g);
+
+// Returns true if a node's id or its children's id matches search_text
+function nodeMatches(nodeId, searchText) {
+  if (nodeId.indexOf(searchText) > -1) return true;
+  return false;
+}
+
+function highlightNodes(nodes, color, strokeWidth) {
+  nodes.forEach((nodeid) => {
+    const myNode = g.node(nodeid).elem;
+    d3.select(myNode)
+      .selectAll('rect,circle')
+      .style('stroke', color)
+      .style('stroke-width', strokeWidth);
+  });
+}
+
+let zoom = null;
+
+function setUpZoomSupport() {
+  // Set up zoom support for Graph
+  zoom = d3.behavior.zoom().on('zoom', () => {
+    innerSvg.attr('transform', `translate(${d3.event.translate})scale(${d3.event.scale})`);
+  });
+  svg.call(zoom);
+
+  // Centering the DAG on load
+  // Get Dagre Graph dimensions
+  const graphWidth = g.graph().width;
+  const graphHeight = g.graph().height;
+  // Get SVG dimensions
+  const padding = 20;
+  const svgBb = svg.node().getBoundingClientRect();
+  const width = svgBb.width - padding * 2;
+  const height = svgBb.height - padding; // we are not centering the dag vertically
+
+  // Calculate applicable scale for zoom
+  const zoomScale = Math.min(
+    Math.min(width / graphWidth, height / graphHeight),
+    1.5, // cap zoom level to 1.5 so nodes are not too large
+  );
+
+  zoom.translate([(width / 2) - ((graphWidth * zoomScale) / 2) + padding, padding]);
+  zoom.scale(zoomScale);
+  zoom.event(innerSvg);
+}
+
+function setUpNodeHighlighting(focusItem = null) {
+  d3.selectAll('g.node').on('mouseover', function (d) {
+    d3.select(this).selectAll('rect').style('stroke', highlightColor);
+    highlightNodes(g.predecessors(d), upstreamColor, highlightStrokeWidth);
+    highlightNodes(g.successors(d), downstreamColor, highlightStrokeWidth);
+    const adjacentNodeNames = [d, ...g.predecessors(d), ...g.successors(d)];
+    d3.selectAll('g.nodes g.node')
+      .filter((x) => !adjacentNodeNames.includes(x))
+      .style('opacity', 0.2);
+    const adjacentEdges = g.nodeEdges(d);
+    d3.selectAll('g.edgePath')[0]
+      // eslint-disable-next-line no-underscore-dangle
+      .filter((x) => !adjacentEdges.includes(x.__data__))
+      .forEach((x) => {
+        d3.select(x).style('opacity', 0.2);
+      });
+  });
+
+  d3.selectAll('g.node').on('mouseout', function (d) {
+    d3.select(this).selectAll('rect,circle').style('stroke', null);
+    highlightNodes(g.predecessors(d), null, initialStrokeWidth);
+    highlightNodes(g.successors(d), null, initialStrokeWidth);
+    d3.selectAll('g.node')
+      .style('opacity', 1);
+    d3.selectAll('g.node rect')
+      .style('stroke-width', initialStrokeWidth);
+    d3.selectAll('g.edgePath')
+      .style('opacity', 1);
+    if (focusItem) {
+      localStorage.removeItem(focusItem);
+    }
+  });
+}
+
+function searchboxHighlighting(s) {
+  let match = null;
+
+  d3.selectAll('g.nodes g.node').forEach(function forEach(d) {
+    if (s === '') {
+      d3.select('g.edgePaths')
+        .transition().duration(duration)
+        .style('opacity', 1);
+      d3.select(this)
+        .transition().duration(duration)
+        .style('opacity', 1)
+        .selectAll('rect')
+        .style('stroke-width', initialStrokeWidth);
+    } else {
+      d3.select('g.edgePaths')
+        .transition().duration(duration)
+        .style('opacity', 0.2);
+      if (nodeMatches(d, s)) {
+        if (!match) match = this;
+        d3.select(this)
+          .transition().duration(duration)
+          .style('opacity', 1)
+          .selectAll('rect')
+          .style('stroke-width', highlightStrokeWidth);
+      } else {
+        d3.select(this)
+          .transition()
+          .style('opacity', 0.2).duration(duration)
+          .selectAll('rect')
+          .style('stroke-width', initialStrokeWidth);
+      }
+    }
+  });
+
+  // This moves the matched node to the center of the graph area
+  if (match) {
+    const transform = d3.transform(d3.select(match).attr('transform'));
+
+    const svgBb = svg.node().getBoundingClientRect();
+    transform.translate = [
+      svgBb.width / 2 - transform.translate[0],
+      svgBb.height / 2 - transform.translate[1],
+    ];
+    transform.scale = [1, 1];
+
+    if (zoom !== null) {
+      zoom.translate(transform.translate);
+      zoom.scale(1);
+      zoom.event(innerSvg);
+    }
+  }
+}
+
+setUpNodeHighlighting();
+setUpZoomSupport();
+
+d3.select('#searchbox').on('keyup', () => {
+  const s = document.getElementById('searchbox').value;
+  searchboxHighlighting(s);
+});
diff --git a/airflow/www/templates/airflow/dag_dependencies.html b/airflow/www/templates/airflow/dag_dependencies.html
new file mode 100644
index 0000000..708b7fc
--- /dev/null
+++ b/airflow/www/templates/airflow/dag_dependencies.html
@@ -0,0 +1,75 @@
+{#
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+#}
+
+{% extends "airflow/main.html" %}
+
+{% block title %}Airflow - DAG Dependencies{% endblock %}
+
+{% block head_css %}
+{{ super() }}
+<link rel="stylesheet" type="text/css" href="{{ url_for_asset('graph.css') }}">
+{% endblock %}
+
+{% block content %}
+{{ super() }}
+<h2>
+  {{ title }}
+  <div class="input-group" style="float: right">
+    <input type="text" id="searchbox" class="form-control" placeholder="Search for..." onenter="null">
+  </div>
+</h2>
+<hr/>
+<div class="legend-row">
+  <div>
+    <span class="legend-item dag">dag</span>
+    <span class="legend-item trigger">trigger</span>
+    <span class="legend-item sensor">sensor</span>
+  </div>
+  <div style="float:right">Last refresh: {{ last_refresh }}</div>
+</div>
+<div id="error" style="display: none; margin-top: 10px;" class="alert alert-danger" role="alert">
+  <span class="glyphicon glyphicon-exclamation-sign" aria-hidden="true"></span>
+  <span id="error_msg">Oops.</span>
+</div>
+<br/>
+<div class="svg-wrapper">
+  <div class="graph-svg-wrap">
+    <svg id="graph-svg" width="{{ width }}" height="{{ height }}">
+      <g id="dig" transform="translate(20,20)"></g>
+      <filter id="blur-effect-1">
+        <feGaussianBlur stdDeviation="3"></feGaussianBlur>
+      </filter>
+    </svg>
+  </div>
+</div>
+{% endblock %}
+
+{% block tail %}
+  {{ super() }}
+  <script src="{{ url_for_asset('d3.min.js') }}"></script>
+  <script src="{{ url_for_asset('dagre-d3.min.js') }}"></script>
+  <script src="{{ url_for_asset('d3-shape.min.js') }}"></script>
+  <script src="{{ url_for_asset('d3-tip.js') }}"></script>
+  <script>
+    let nodes = {{ nodes|tojson }};
+    let edges = {{ edges|tojson }};
+    const arrange = '{{ arrange }}';
+  </script>
+  <script src="{{ url_for_asset('dagDependencies.js') }}"></script>
+{% endblock %}
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 6e38cd6..24a4506 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -97,6 +97,7 @@ from airflow.models import DAG, Connection, DagModel, DagTag, Log, SlaMiss, Task
 from airflow.models.baseoperator import BaseOperator
 from airflow.models.dagcode import DagCode
 from airflow.models.dagrun import DagRun, DagRunType
+from airflow.models.serialized_dag import SerializedDagModel
 from airflow.models.taskinstance import TaskInstance
 from airflow.providers_manager import ProvidersManager
 from airflow.security import permissions
@@ -3908,6 +3909,78 @@ class DagModelView(AirflowModelView):
         return wwwutils.json_response(payload)
 
 
+class DagDependenciesView(AirflowBaseView):
+    """View to show dependencies between DAGs"""
+
+    refresh_interval = timedelta(
+        seconds=conf.getint(
+            "webserver",
+            "dag_dependencies_refresh_interval",
+            fallback=conf.getint("scheduler", "dag_dir_list_interval"),
+        )
+    )
+    last_refresh = timezone.utcnow() - refresh_interval
+    nodes = []
+    edges = []
+
+    @expose('/dag-dependencies')
+    @auth.has_access(
+        [
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_DEPENDENCIES),
+        ]
+    )
+    @gzipped
+    @action_logging
+    def list(self):
+        """Display DAG dependencies"""
+        title = "DAG Dependencies"
+
+        if timezone.utcnow() > self.last_refresh + self.refresh_interval:
+            if SerializedDagModel.get_max_last_updated_datetime() > self.last_refresh:
+                self._calculate_graph()
+            self.last_refresh = timezone.utcnow()
+
+        return self.render_template(
+            "airflow/dag_dependencies.html",
+            title=title,
+            nodes=self.nodes,
+            edges=self.edges,
+            last_refresh=self.last_refresh.strftime("%Y-%m-%d %H:%M:%S"),
+            arrange=conf.get("webserver", "dag_orientation"),
+            width=request.args.get("width", "100%"),
+            height=request.args.get("height", "800"),
+        )
+
+    def _calculate_graph(self):
+
+        nodes = []
+        edges = []
+
+        for dag, dependencies in SerializedDagModel.get_dag_dependencies().items():
+            dag_node_id = f"dag:{dag}"
+            nodes.append(self._node_dict(dag_node_id, dag, "dag"))
+
+            for dep in dependencies:
+
+                nodes.append(self._node_dict(dep.node_id, dep.dependency_id, dep.dependency_type))
+                edges.extend(
+                    [
+                        {"u": f"dag:{dep.source}", "v": dep.node_id},
+                        {"u": dep.node_id, "v": f"dag:{dep.target}"},
+                    ]
+                )
+
+        self.nodes = nodes
+        self.edges = edges
+
+    @staticmethod
+    def _node_dict(node_id, label, node_class):
+        return {
+            "id": node_id,
+            "value": {"label": label, "rx": 5, "ry": 5, "class": node_class},
+        }
+
+
 class CustomPermissionModelView(PermissionModelView):
     """Customize permission names for FAB's builtin PermissionModelView."""
 
diff --git a/airflow/www/webpack.config.js b/airflow/www/webpack.config.js
index fb7e63c..e5683b6 100644
--- a/airflow/www/webpack.config.js
+++ b/airflow/www/webpack.config.js
@@ -40,6 +40,7 @@ const config = {
     airflowDefaultTheme: `${CSS_DIR}/bootstrap-theme.css`,
     connectionForm: `${JS_DIR}/connection_form.js`,
     dagCode: `${JS_DIR}/dag_code.js`,
+    dagDependencies: `${JS_DIR}/dag_dependencies.js`,
     dags: [`${CSS_DIR}/dags.css`, `${JS_DIR}/dags.js`],
     flash: `${CSS_DIR}/flash.css`,
     gantt: [`${CSS_DIR}/gantt.css`, `${JS_DIR}/gantt.js`],
diff --git a/docs/apache-airflow/concepts/dags.rst b/docs/apache-airflow/concepts/dags.rst
index b82b55f..46849ab 100644
--- a/docs/apache-airflow/concepts/dags.rst
+++ b/docs/apache-airflow/concepts/dags.rst
@@ -585,3 +585,21 @@ Note that packaged DAGs come with some caveats:
 * They will be inserted into Python's ``sys.path`` and importable by any other code in the Airflow process, so ensure the package names don't clash with other packages already installed on your system.
 
 In general, if you have a complex set of compiled dependencies and modules, you are likely better off using the Python ``virtualenv`` system and installing the necessary packages on your target systems with ``pip``.
+
+DAG Dependencies
+================
+
+*Added in Airflow 2.1*
+
+While dependencies between tasks in a DAG are explicitly defined through upstream and downstream
+relationships, dependencies between DAGs are a bit more complex. In general, there are two ways
+in which one DAG can depend on another:
+
+- triggering - :class:`~airflow.operators.trigger_dagrun.TriggerDagRunOperator`
+- waiting - :class:`~airflow.sensors.external_task_sensor.ExternalTaskSensor`
+
+Additional difficulty is that one DAG could wait for or trigger several runs of the other DAG
+with different execution dates. The **Dag Dependencies** view
+``Menu -> Browse -> DAG Dependencies`` helps visualize dependencies between DAGs. The dependencies
+are calculated by the scheduler during DAG serialization and the webserver uses them to build
+the dependency graph.
diff --git a/setup.cfg b/setup.cfg
index bad32b3..bc2e5d6 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -92,6 +92,7 @@ install_requires =
     connexion[swagger-ui,flask]>=2.6.0,<3
     croniter>=0.3.17, <0.4
     cryptography>=0.9.3
+    dataclasses;python_version<"3.7"
     dill>=0.2.2, <0.4
     flask>=1.1.0, <2.0
     flask-appbuilder~=3.1,>=3.1.1
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index 6a186c5..c1ce92e 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -145,6 +145,7 @@ serialized_simple_dag_ground_truth = {
             },
         },
         "edge_info": {},
+        "dag_dependencies": [],
     },
 }
 
@@ -817,7 +818,13 @@ class TestStringifiedDAGs(unittest.TestCase):
         dag_schema: dict = load_dag_schema_dict()["definitions"]["dag"]["properties"]
 
         # The parameters we add manually in Serialization needs to be ignored
-        ignored_keys: set = {"is_subdag", "tasks", "has_on_success_callback", "has_on_failure_callback"}
+        ignored_keys: set = {
+            "is_subdag",
+            "tasks",
+            "has_on_success_callback",
+            "has_on_failure_callback",
+            "dag_dependencies",
+        }
         dag_params: set = set(dag_schema.keys()) - ignored_keys
         assert set(DAG.get_serialized_fields()) == dag_params
 
diff --git a/tests/www/test_security.py b/tests/www/test_security.py
index 6dabe8c..73fe4b1 100644
--- a/tests/www/test_security.py
+++ b/tests/www/test_security.py
@@ -283,6 +283,7 @@ class TestSecurity(unittest.TestCase):
         viewer_role_perms = {
             (permissions.ACTION_CAN_READ, permissions.RESOURCE_AUDIT_LOG),
             (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_DEPENDENCIES),
             (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_CODE),
             (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
             (permissions.ACTION_CAN_READ, permissions.RESOURCE_IMPORT_ERROR),
diff --git a/tests/www/test_views.py b/tests/www/test_views.py
index 88bceff..e670063 100644
--- a/tests/www/test_views.py
+++ b/tests/www/test_views.py
@@ -280,7 +280,7 @@ class TestAirflowBaseViews(TestBase):
         )
 
     def test_index(self):
-        with assert_queries_count(43):
+        with assert_queries_count(44):
             resp = self.client.get('/', follow_redirects=True)
         self.check_content_in_response('DAGs', resp)
 
@@ -827,6 +827,12 @@ class TestAirflowBaseViews(TestBase):
         resp = self.client.get(url, follow_redirects=True)
         self.check_content_in_response('runme_1', resp)
 
+    def test_dag_dependencies(self):
+        url = 'dag-dependencies'
+        resp = self.client.get(url, follow_redirects=True)
+        self.check_content_in_response('child_task1', resp)
+        self.check_content_in_response('test_trigger_dagrun', resp)
+
     def test_last_dagruns(self):
         resp = self.client.post('last_dagruns', follow_redirects=True)
         self.check_content_in_response('example_bash_operator', resp)